[ 
https://issues.apache.org/jira/browse/BEAM-9577?focusedWorklogId=415672&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415672
 ]

ASF GitHub Bot logged work on BEAM-9577:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Apr/20 19:04
            Start Date: 03/Apr/20 19:04
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on pull request #11305: [BEAM-9577] 
Update container boot code to stage from dependencies, if present.
URL: https://github.com/apache/beam/pull/11305#discussion_r403251324
 
 

 ##########
 File path: sdks/go/pkg/beam/artifact/materialize.go
 ##########
 @@ -31,15 +31,148 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+       "github.com/golang/protobuf/proto"
+)
+
+// TODO(lostluck): 2018/05/28 Extract these from their enum descriptors in the 
pipeline_v1 proto
+const (
+       URNStagingTo      = "beam:artifact:role:staging_to:v1"
+       NoArtifactsStaged = "__no_artifacts_staged__"
 )
 
 // Materialize is a convenience helper for ensuring that all artifacts are
 // present and uncorrupted. It interprets each artifact name as a relative
 // path under the dest directory. It does not retrieve valid artifacts already
 // present.
-func Materialize(ctx context.Context, endpoint string, rt string, dest string) 
([]*pb.ArtifactMetadata, error) {
+// TODO(BEAM-9577): Return a mapping of filename to dependency, rather than 
[]*pb.ArtifactMetadata.
+// TODO(BEAM-9577): Leverage richness of roles rather than magic names to 
understand artifacts.
+func Materialize(ctx context.Context, endpoint string, dependencies 
[]*pipeline_v1.ArtifactInformation, rt string, dest string) 
([]*pb.ArtifactMetadata, error) {
+       if len(dependencies) > 0 {
+               return newMaterialize(ctx, endpoint, dependencies, dest)
+       } else if rt == "" || rt == NoArtifactsStaged {
+               return []*pb.ArtifactMetadata{}, nil
+       } else {
+               return legacyMaterialize(ctx, endpoint, rt, dest)
+       }
+}
+
+func newMaterialize(ctx context.Context, endpoint string, dependencies 
[]*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, 
error) {
+       cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
+       if err != nil {
+               return nil, err
+       }
+       defer cc.Close()
+
+       return newMaterializeWithClient(ctx, 
pb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
+}
+
+func newMaterializeWithClient(ctx context.Context, client 
pb.ArtifactRetrievalServiceClient, dependencies 
[]*pipeline_v1.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, 
error) {
+       resolution, err := client.ResolveArtifact(ctx, 
&pb.ResolveArtifactRequest{Artifacts: dependencies})
+       if err != nil {
+               return nil, err
+       }
+
+       var md []*pb.ArtifactMetadata
+       var list []retrievable
+       for _, dep := range resolution.Replacements {
+               path, err := extractStagingToPath(dep)
+               if err != nil {
+                       return nil, err
+               }
+               md = append(md, &pb.ArtifactMetadata{
+                       Name: path,
+               })
+
+               list = append(list, &artifact{
+                       client: client,
+                       dep:    dep,
+               })
+       }
+
+       return md, MultiRetrieve(ctx, 10, list, dest)
+}
+
+func extractStagingToPath(artifact *pipeline_v1.ArtifactInformation) (string, 
error) {
+       if artifact.RoleUrn != URNStagingTo {
+               return "", errors.Errorf("Unsupported artifact role %s", 
artifact.RoleUrn)
+       }
+       role := pipeline_v1.ArtifactStagingToRolePayload{}
+       if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
+               return "", err
+       }
+       return role.StagedName, nil
+}
+
+type artifact struct {
+       client pb.ArtifactRetrievalServiceClient
+       dep    *pipeline_v1.ArtifactInformation
+}
+
+func (a artifact) retrieve(ctx context.Context, dest string) error {
+       path, err := extractStagingToPath(a.dep)
+       if err != nil {
+               return err
+       }
+
+       filename := filepath.Join(dest, filepath.FromSlash(path))
+
+       _, err = os.Stat(filename)
+       if err == nil {
+               if err = os.Remove(filename); err != nil {
+                       return errors.Errorf("failed to delete: %v (remove: 
%v)", filename, err)
+               }
+       } else if !os.IsNotExist(err) {
+               return errors.Wrapf(err, "failed to stat %v", filename)
+       }
+
+       stream, err := a.client.GetArtifact(ctx, 
&pb.GetArtifactRequest{Artifact: a.dep})
+       if err != nil {
+               return err
+       }
+
+       fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 
0755)
+       if err != nil {
+               return err
+       }
+       w := bufio.NewWriter(fd)
+
+       err = writeChunks(stream, w)
+       if err != nil {
+               fd.Close() // drop any buffered content
+               return errors.Wrapf(err, "failed to retrieve chunk for %v", 
filename)
+       }
+       if err := w.Flush(); err != nil {
+               fd.Close()
+               return errors.Wrapf(err, "failed to flush chunks for %v", 
filename)
+       }
+       if err := fd.Close(); err != nil {
+               return err
+       }
+
+       return nil
 
 Review comment:
   ```suggestion
        return fd.Close()
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 415672)
    Time Spent: 9h 20m  (was: 9h 10m)

> Update artifact staging and retrieval protocols to be dependency aware.
> -----------------------------------------------------------------------
>
>                 Key: BEAM-9577
>                 URL: https://issues.apache.org/jira/browse/BEAM-9577
>             Project: Beam
>          Issue Type: Improvement
>          Components: beam-model
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 9h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to