[ 
https://issues.apache.org/jira/browse/BEAM-9578?focusedWorklogId=415286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415286
 ]

ASF GitHub Bot logged work on BEAM-9578:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Apr/20 02:23
            Start Date: 03/Apr/20 02:23
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #11205: [BEAM-9578] 
Enumerating artifacts is too expensive in Java
URL: https://github.com/apache/beam/pull/11205#discussion_r402700601
 
 

 ##########
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 ##########
 @@ -214,24 +220,87 @@ public static Environment createProcessEnvironment(
       pathsToStage.addAll(stagingFiles);
     }
 
-    ImmutableList.Builder<ArtifactInformation> filesToStage = 
ImmutableList.builder();
+    ImmutableList.Builder<Supplier<ArtifactInformation>> lazyArtifactsBuilder =
+        ImmutableList.builder();
     for (String path : pathsToStage) {
       File file = new File(path);
-      if (new File(path).exists()) {
-        // Spurious items get added to the classpath. Filter by just those 
that exist.
-        if (file.isDirectory()) {
-          // Zip up directories so we can upload them to the artifact service.
-          try {
-            filesToStage.add(createArtifactInformation(zipDirectory(file)));
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        } else {
-          filesToStage.add(createArtifactInformation(file));
-        }
+      // Spurious items get added to the classpath. Filter by just those that 
exist.
+      if (file.exists()) {
+        ArtifactInformation.Builder artifactBuilder = 
ArtifactInformation.newBuilder();
+        
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
+        
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
+        artifactBuilder.setRolePayload(
+            RunnerApi.ArtifactStagingToRolePayload.newBuilder()
+                .setStagedName(createStagingFileName(file))
+                .build()
+                .toByteString());
+        lazyArtifactsBuilder.add(
+            file.isDirectory()
+                ? () -> {
+                  File zippedFile;
+                  HashCode hashCode;
+                  try {
+                    zippedFile = zipDirectory(file);
+                    hashCode = 
Files.asByteSource(zippedFile).hash(Hashing.sha256());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return artifactBuilder
+                      .setTypePayload(
+                          RunnerApi.ArtifactFilePayload.newBuilder()
+                              .setPath(zippedFile.getPath())
+                              .setSha256(hashCode.toString())
+                              .build()
+                              .toByteString())
+                      .build();
+                }
+                : () -> {
+                  HashCode hashCode;
+                  try {
+                    hashCode = Files.asByteSource(file).hash(Hashing.sha256());
+                  } catch (IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                  return artifactBuilder
+                      .setTypePayload(
+                          RunnerApi.ArtifactFilePayload.newBuilder()
+                              .setPath(file.getPath())
+                              .setSha256(hashCode.toString())
+                              .build()
+                              .toByteString())
+                      .build();
+                });
       }
     }
-    return filesToStage.build();
+
+    List<Supplier<ArtifactInformation>> lazyArtifacts = 
lazyArtifactsBuilder.build();
+    String id = UUID.randomUUID().toString();
+    DefaultArtifactResolver.INSTANCE.register(
+        (info) -> {
+          if 
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
+            RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
+            try {
+              deferredArtifactPayload =
+                  
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
+            } catch (InvalidProtocolBufferException e) {
+              throw new RuntimeException("Error parsing deferred artifact 
payload.", e);
+            }
+            if (id.equals(deferredArtifactPayload.getKey())) {
+              return 
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
+            } else {
+              return ImmutableList.of();
+            }
+          } else {
+            return ImmutableList.of();
+          }
+        });
+
+    return ImmutableList.of(
+        ArtifactInformation.newBuilder()
+            .setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
+            .setTypePayload(
+                
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(id).build().toByteString())
 
 Review comment:
   ```suggestion
       String key = UUID.randomUUID().toString();
       DefaultArtifactResolver.INSTANCE.register(
           (info) -> {
             if 
(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
               RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
               try {
                 deferredArtifactPayload =
                     
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
               } catch (InvalidProtocolBufferException e) {
                 throw new RuntimeException("Error parsing deferred artifact 
payload.", e);
               }
               if (key.equals(deferredArtifactPayload.getKey())) {
                 return 
lazyArtifacts.stream().map(Supplier::get).collect(Collectors.toList());
               } else {
                 return ImmutableList.of();
               }
             } else {
               return ImmutableList.of();
             }
           });
   
       return ImmutableList.of(
           ArtifactInformation.newBuilder()
               .setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
               .setTypePayload(
                   
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(key).build().toByteString())
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 415286)
    Time Spent: 5h  (was: 4h 50m)

> Enumerating artifacts is too expensive in Java
> ----------------------------------------------
>
>                 Key: BEAM-9578
>                 URL: https://issues.apache.org/jira/browse/BEAM-9578
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Luke Cwik
>            Assignee: Heejong Lee
>            Priority: Critical
>              Labels: portability
>             Fix For: 2.21.0
>
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> There are a lot of places (e.g. *ParDoTranslation#getParDoPayload*) which 
> effectively call *Environments#createOrGetDefaultEnvironment* which causes 
> [artifacts to be 
> computed|https://github.com/apache/beam/blob/fc6cef9972780ca6b7525d4aadd65a8344221f1b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java#L114].
> This leads to zipping directories for non-jar dependencies.
> Similar problems may exist for Python/Go.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to