This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this
push:
new a4dcf29 Update materialize.go
a4dcf29 is described below
commit a4dcf29f652eadec083641477e7ba9a83a0f6d6a
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:17:31 2020 -0700
Update materialize.go
---
sdks/go/pkg/beam/artifact/materialize.go | 52 ++++++++++++++++----------------
1 file changed, 26 insertions(+), 26 deletions(-)
diff --git a/sdks/go/pkg/beam/artifact/materialize.go
b/sdks/go/pkg/beam/artifact/materialize.go
index 859381b..8f7c3af 100644
--- a/sdks/go/pkg/beam/artifact/materialize.go
+++ b/sdks/go/pkg/beam/artifact/materialize.go
@@ -30,8 +30,8 @@ import (
"time"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
- pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
- ppb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"github.com/golang/protobuf/proto"
@@ -49,40 +49,40 @@ const (
// present.
// TODO(BEAM-9577): Return a mapping of filename to dependency, rather than
[]*pb.ArtifactMetadata.
// TODO(BEAM-9577): Leverage richness of roles rather than magic names to
understand artifacts.
-func Materialize(ctx context.Context, endpoint string, dependencies
[]*ppb.ArtifactInformation, rt string, dest string) ([]*pb.ArtifactMetadata,
error) {
+func Materialize(ctx context.Context, endpoint string, dependencies
[]*pipepb.ArtifactInformation, rt string, dest string)
([]*jobpb.ArtifactMetadata, error) {
if len(dependencies) > 0 {
return newMaterialize(ctx, endpoint, dependencies, dest)
} else if rt == "" || rt == NoArtifactsStaged {
- return []*pb.ArtifactMetadata{}, nil
+ return []*jobpb.ArtifactMetadata{}, nil
} else {
return legacyMaterialize(ctx, endpoint, rt, dest)
}
}
-func newMaterialize(ctx context.Context, endpoint string, dependencies
[]*ppb.ArtifactInformation, dest string) ([]*pb.ArtifactMetadata, error) {
+func newMaterialize(ctx context.Context, endpoint string, dependencies
[]*pipepb.ArtifactInformation, dest string) ([]*jobpb.ArtifactMetadata, error) {
cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
if err != nil {
return nil, err
}
defer cc.Close()
- return newMaterializeWithClient(ctx,
pb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
+ return newMaterializeWithClient(ctx,
jobpb.NewArtifactRetrievalServiceClient(cc), dependencies, dest)
}
-func newMaterializeWithClient(ctx context.Context, client
pb.ArtifactRetrievalServiceClient, dependencies []*ppb.ArtifactInformation,
dest string) ([]*pb.ArtifactMetadata, error) {
- resolution, err := client.ResolveArtifacts(ctx,
&pb.ResolveArtifactsRequest{Artifacts: dependencies})
+func newMaterializeWithClient(ctx context.Context, client
jobpb.ArtifactRetrievalServiceClient, dependencies
[]*pipepb.ArtifactInformation, dest string) ([]*jobpb.ArtifactMetadata, error) {
+ resolution, err := client.ResolveArtifacts(ctx,
&jobpb.ResolveArtifactsRequest{Artifacts: dependencies})
if err != nil {
return nil, err
}
- var md []*pb.ArtifactMetadata
+ var md []*jobpb.ArtifactMetadata
var list []retrievable
for _, dep := range resolution.Replacements {
path, err := extractStagingToPath(dep)
if err != nil {
return nil, err
}
- md = append(md, &pb.ArtifactMetadata{
+ md = append(md, &jobpb.ArtifactMetadata{
Name: path,
})
@@ -95,11 +95,11 @@ func newMaterializeWithClient(ctx context.Context, client
pb.ArtifactRetrievalSe
return md, MultiRetrieve(ctx, 10, list, dest)
}
-func extractStagingToPath(artifact *ppb.ArtifactInformation) (string, error) {
+func extractStagingToPath(artifact *pipepb.ArtifactInformation) (string,
error) {
if artifact.RoleUrn != URNStagingTo {
return "", errors.Errorf("Unsupported artifact role %s",
artifact.RoleUrn)
}
- role := ppb.ArtifactStagingToRolePayload{}
+ role := pipepb.ArtifactStagingToRolePayload{}
if err := proto.Unmarshal(artifact.RolePayload, &role); err != nil {
return "", err
}
@@ -107,8 +107,8 @@ func extractStagingToPath(artifact
*ppb.ArtifactInformation) (string, error) {
}
type artifact struct {
- client pb.ArtifactRetrievalServiceClient
- dep *ppb.ArtifactInformation
+ client jobpb.ArtifactRetrievalServiceClient
+ dep *pipepb.ArtifactInformation
}
func (a artifact) retrieve(ctx context.Context, dest string) error {
@@ -128,7 +128,7 @@ func (a artifact) retrieve(ctx context.Context, dest
string) error {
return errors.Wrapf(err, "failed to stat %v", filename)
}
- stream, err := a.client.GetArtifact(ctx,
&pb.GetArtifactRequest{Artifact: a.dep})
+ stream, err := a.client.GetArtifact(ctx,
&jobpb.GetArtifactRequest{Artifact: a.dep})
if err != nil {
return err
}
@@ -151,7 +151,7 @@ func (a artifact) retrieve(ctx context.Context, dest
string) error {
return fd.Close()
}
-func writeChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w
io.Writer) error {
+func writeChunks(stream jobpb.ArtifactRetrievalService_GetArtifactClient, w
io.Writer) error {
for {
chunk, err := stream.Recv()
if err == io.EOF {
@@ -168,16 +168,16 @@ func writeChunks(stream
pb.ArtifactRetrievalService_GetArtifactClient, w io.Writ
return nil
}
-func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest
string) ([]*pb.ArtifactMetadata, error) {
+func legacyMaterialize(ctx context.Context, endpoint string, rt string, dest
string) ([]*jobpb.ArtifactMetadata, error) {
cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
if err != nil {
return nil, err
}
defer cc.Close()
- client := pb.NewLegacyArtifactRetrievalServiceClient(cc)
+ client := jobpb.NewLegacyArtifactRetrievalServiceClient(cc)
- m, err := client.GetManifest(ctx,
&pb.GetManifestRequest{RetrievalToken: rt})
+ m, err := client.GetManifest(ctx,
&jobpb.GetManifestRequest{RetrievalToken: rt})
if err != nil {
return nil, errors.Wrap(err, "failed to get manifest")
}
@@ -249,7 +249,7 @@ type retrievable interface {
}
// LegacyMultiRetrieve is exported for testing.
-func LegacyMultiRetrieve(ctx context.Context, client
pb.LegacyArtifactRetrievalServiceClient, cpus int, list []*pb.ArtifactMetadata,
rt string, dest string) error {
+func LegacyMultiRetrieve(ctx context.Context, client
jobpb.LegacyArtifactRetrievalServiceClient, cpus int, list
[]*jobpb.ArtifactMetadata, rt string, dest string) error {
var rlist []retrievable
for _, md := range list {
rlist = append(rlist, &legacyArtifact{
@@ -275,7 +275,7 @@ func (a legacyArtifact) retrieve(ctx context.Context, dest
string) error {
// retrieved. If not, it retrieves into the dest directory. It overwrites any
// previous retrieval attempt and may leave a corrupt/partial local file on
// failure.
-func Retrieve(ctx context.Context, client
pb.LegacyArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, rt string,
dest string) error {
+func Retrieve(ctx context.Context, client
jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt
string, dest string) error {
filename := filepath.Join(dest, filepath.FromSlash(a.Name))
_, err := os.Stat(filename)
@@ -310,8 +310,8 @@ func Retrieve(ctx context.Context, client
pb.LegacyArtifactRetrievalServiceClien
// It validates that the given SHA256 matches the content and fails otherwise.
// It expects the file to not exist, but does not clean up on failure and
// may leave a corrupt file.
-func retrieve(ctx context.Context, client
pb.LegacyArtifactRetrievalServiceClient, a *pb.ArtifactMetadata, rt string,
filename string) error {
- stream, err := client.GetArtifact(ctx,
&pb.LegacyGetArtifactRequest{Name: a.Name, RetrievalToken: rt})
+func retrieve(ctx context.Context, client
jobpb.LegacyArtifactRetrievalServiceClient, a *jobpb.ArtifactMetadata, rt
string, filename string) error {
+ stream, err := client.GetArtifact(ctx,
&jobpb.LegacyGetArtifactRequest{Name: a.Name, RetrievalToken: rt})
if err != nil {
return err
}
@@ -342,7 +342,7 @@ func retrieve(ctx context.Context, client
pb.LegacyArtifactRetrievalServiceClien
return nil
}
-func retrieveChunks(stream
pb.LegacyArtifactRetrievalService_GetArtifactClient, w io.Writer) (string,
error) {
+func retrieveChunks(stream
jobpb.LegacyArtifactRetrievalService_GetArtifactClient, w io.Writer) (string,
error) {
sha256W := sha256.New()
for {
chunk, err := stream.Recv()
@@ -398,8 +398,8 @@ func slice2queue(list []retrievable) chan retrievable {
return q
}
-func queue2slice(q chan *pb.ArtifactMetadata) []*pb.ArtifactMetadata {
- var ret []*pb.ArtifactMetadata
+func queue2slice(q chan *jobpb.ArtifactMetadata) []*jobpb.ArtifactMetadata {
+ var ret []*jobpb.ArtifactMetadata
for elm := range q {
ret = append(ret, elm)
}