This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this 
push:
     new a4dcf29  Update materialize.go
a4dcf29 is described below

commit a4dcf29f652eadec083641477e7ba9a83a0f6d6a
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:17:31 2020 -0700

    Update materialize.go
---
 sdks/go/pkg/beam/artifact/materialize.go | 52 ++++++++++++++++----------------
 1 file changed, 26 insertions(+), 26 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/materialize.go 
b/sdks/go/pkg/beam/artifact/materialize.go
index 859381b..8f7c3af 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -30,8 +30,8 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
-       ppb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
        "github.com/golang/protobuf/proto"
@@ -49,40 +49,40 @@ const (
 // present.
 // TODO(BEAM-9577): Return a mapping of filename to dependency, rather than 
[]*pb.ArtifactMetadata.
 // TODO(BEAM-9577): Leverage richness of roles rather than magic names to 
understand artifacts.
-func Materialize(ctx context.Context, endpoint string, dependencies 
[]*ppb.ArtifactInformation, rt string, dest string) ([]*pb.ArtifactMetadata, 
error) {
+func Materialize(ctx context.Context, endpoint string, dependencies 
[]*pipepb.ArtifactInformation, rt string, dest string) 
([]*jobpb.ArtifactMetadata, error) {
        if len(dependencies) > 0 {
                return newMaterialize(ctx, endpoint, dependencies, dest)
        } else if rt == "" || rt == NoArtifactsStaged {
-               return []*pb.ArtifactMetadata{}, nil
+               return []*jobpb.ArtifactMetadata{}, nil
        } else {
                return legacyMaterialize(ctx, endpoint, rt, dest)
        }
 }
 
-func newMaterialize(ctx context.Context, endpoint string, dependencies 
[]*ppb.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+func newMaterialize(ctx context.Context, endpoint string, dependencies 
[]*pipepb.ArtifactInformation, dest string) ([]*jobpb.ArtifactMetadata, error) {
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
                return nil, err
        }
        defer cc.Close()
 
-       return newMaterializeWithClient(ctx, 
pb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
+       return newMaterializeWithClient(ctx, 
jobpb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
 }
 
-func newMaterializeWithClient(ctx context.Context, client 
pb.ArtifactRetrievalServiceClient, dependencies []*ppb.ArtifactInformation, 
dest string) ([]*pb.ArtifactMetadata, error) {
-       resolution, err := client.ResolveArtifacts(ctx, 
&pb.ResolveArtifactsRequest{Artifacts: dependencies})
+func newMaterializeWithClient(ctx context.Context, client 
jobpb.ArtifactRetrievalServiceClient, dependencies 
[]*pipepb.ArtifactInformation, dest string) ([]*jobpb.ArtifactMetadata, error) {
+       resolution, err := client.ResolveArtifacts(ctx, 
&jobpb.ResolveArtifactsRequest{Artifacts: dependencies})
        if err != nil {
                return nil, err
        }
 
-       var md []*pb.ArtifactMetadata
+       var md []*jobpb.ArtifactMetadata
        var list []retrievable
        for _, dep := range resolution.Replacements {
                path, err := extractStagingToPath(dep)
                if err != nil {
                        return nil, err
                }
-               md = append(md, &pb.ArtifactMetadata{
+               md = append(md, &jobpb.ArtifactMetadata{
                        Name: path,
                })
 
@@ -95,11 +95,11 @@ func newMaterializeWithClient(ctx context.Context, client 
pb.ArtifactRetrievalSe
        return md, MultiRetrieve(ctx, 10, list, dest)
 }
 
-func extractStagingToPath(artifact *ppb.ArtifactInformation) (string, error) {
+func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string, 
error) {
        if artifact.RoleUrn != URNStagingTo {
                return "", errors.Errorf("Unsupported artifact role %s", 
artifact.RoleUrn)
        }
-       role := ppb.ArtifactStagingToRolePayload{}
+       role := pipepb.ArtifactStagingToRolePayload{}
        if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
                return "", err
        }
@@ -107,8 +107,8 @@ func extractStagingToPath(artifact 
*ppb.ArtifactInformation) (string, error) {
 }
 
 type artifact struct {
-       client pb.ArtifactRetrievalServiceClient
-       dep    *ppb.ArtifactInformation
+       client jobpb.ArtifactRetrievalServiceClient
+       dep    *pipepb.ArtifactInformation
 }
 
 func (a artifact) retrieve(ctx context.Context, dest string) error {
@@ -128,7 +128,7 @@ func (a artifact) retrieve(ctx context.Context, dest 
string) error {
                return errors.Wrapf(err, "failed to stat %v", filename)
        }
 
-       stream, err := a.client.GetArtifact(ctx, 
&pb.GetArtifactRequest{Artifact: a.dep})
+       stream, err := a.client.GetArtifact(ctx, 
&jobpb.GetArtifactRequest{Artifact: a.dep})
        if err != nil {
                return err
        }
@@ -151,7 +151,7 @@ func (a artifact) retrieve(ctx context.Context, dest 
string) error {
        return fd.Close()
 }
 
-func writeChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w 
io.Writer) error {
+func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w 
io.Writer) error {
        for {
                chunk, err := stream.Recv()
                if err == io.EOF {
@@ -168,16 +168,16 @@ func writeChunks(stream 
pb.ArtifactRetrievalService_GetArtifactClient, w io.Writ
        return nil
 }
 
-func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest 
string) ([]*pb.ArtifactMetadata, error) {
+func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest 
string) ([]*jobpb.ArtifactMetadata, error) {
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
                return nil, err
        }
        defer cc.Close()
 
-       client := pb.NewLegacyArtifactRetrievalServiceClient(cc)
+       client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc)
 
-       m, err := client.GetManifest(ctx, 
&pb.GetManifestRequest{RetrievalToken: rt})
+       m, err := client.GetManifest(ctx, 
&jobpb.GetManifestRequest{RetrievalToken: rt})
        if err != nil {
                return nil, errors.Wrap(err, "failed to get manifest")
        }
@@ -249,7 +249,7 @@ type retrievable interface {
 }
 
 // LegacyMultiRetrieve is exported for testing.
-func LegacyMultiRetrieve(ctx context.Context, client 
pb.LegacyArtifactRetrievalServiceClient, cpus int, list []*pb.ArtifactMetadata, 
rt string, dest string) error {
+func LegacyMultiRetrieve(ctx context.Context, client 
jobpb.LegacyArtifactRetrievalServiceClient, cpus int, list 
[]*jobpb.ArtifactMetadata, rt string, dest string) error {
        var rlist []retrievable
        for _, md := range list {
                rlist = append(rlist, &legacyArtifact{
@@ -275,7 +275,7 @@ func (a legacyArtifact) retrieve(ctx context.Context, dest 
string) error {
 // retrieved. If not, it retrieves into the dest directory. It overwrites any
 // previous retrieval attempt and may leave a corrupt/partial local file on
 // failure.
-func Retrieve(ctx context.Context, client 
pb.LegacyArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, rt string, 
dest string) error {
+func Retrieve(ctx context.Context, client 
jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt 
string, dest string) error {
        filename := filepath.Join(dest, filepath.FromSlash(a.Name))
 
        _, err := os.Stat(filename)
@@ -310,8 +310,8 @@ func Retrieve(ctx context.Context, client 
pb.LegacyArtifactRetrievalServiceClien
 // It validates that the given SHA256 matches the content and fails otherwise.
 // It expects the file to not exist, but does not clean up on failure and
 // may leave a corrupt file.
-func retrieve(ctx context.Context, client 
pb.LegacyArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, rt string, 
filename string) error {
-       stream, err := client.GetArtifact(ctx, 
&pb.LegacyGetArtifactRequest{Name: a.Name, RetrievalToken: rt})
+func retrieve(ctx context.Context, client 
jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt 
string, filename string) error {
+       stream, err := client.GetArtifact(ctx, 
&jobpb.LegacyGetArtifactRequest{Name: a.Name, RetrievalToken: rt})
        if err != nil {
                return err
        }
@@ -342,7 +342,7 @@ func retrieve(ctx context.Context, client 
pb.LegacyArtifactRetrievalServiceClien
        return nil
 }
 
-func retrieveChunks(stream 
pb.LegacyArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, 
error) {
+func retrieveChunks(stream 
jobpb.LegacyArtifactRetrievalService_GetArtifactClient, w io.Writer) (string, 
error) {
        sha256W := sha256.New()
        for {
                chunk, err := stream.Recv()
@@ -398,8 +398,8 @@ func slice2queue(list []retrievable) chan retrievable {
        return q
 }
 
-func queue2slice(q chan *pb.ArtifactMetadata) []*pb.ArtifactMetadata {
-       var ret []*pb.ArtifactMetadata
+func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata {
+       var ret []*jobpb.ArtifactMetadata
        for elm := range q {
                ret = append(ret, elm)
        }

Reply via email to