This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this 
push:
     new cf19153  Update staging.go
cf19153 is described below

commit cf191537093d185f577b9a6f8a4349326acbae75
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 22:28:52 2020 -0700

    Update staging.go
---
 sdks/go/pkg/beam/artifact/gcsproxy/staging.go | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go 
b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
index fa5933b..095450e 100644
--- a/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
+++ b/sdks/go/pkg/beam/artifact/gcsproxy/staging.go
@@ -26,7 +26,7 @@ import (
 
        "cloud.google.com/go/storage"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
+       jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
        "github.com/golang/protobuf/proto"
        "golang.org/x/net/context"
@@ -64,7 +64,7 @@ func NewStagingServer(manifest string) (*StagingServer, 
error) {
 }
 
 // CommitManifest commits the given artifact manifest to GCS.
-func (s *StagingServer) CommitManifest(ctx context.Context, req 
*pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
+func (s *StagingServer) CommitManifest(ctx context.Context, req 
*jobpb.CommitManifestRequest) (*jobpb.CommitManifestResponse, error) {
        manifest := req.GetManifest()
 
        s.mu.Lock()
@@ -75,7 +75,7 @@ func (s *StagingServer) CommitManifest(ctx context.Context, 
req *pb.CommitManife
        }
        s.mu.Unlock()
 
-       data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, 
Location: loc})
+       data, err := proto.Marshal(&jobpb.ProxyManifest{Manifest: manifest, 
Location: loc})
        if err != nil {
                return nil, errors.Wrap(err, "failed to marshal proxy manifest")
        }
@@ -93,13 +93,13 @@ func (s *StagingServer) CommitManifest(ctx context.Context, 
req *pb.CommitManife
        // now, but would be needed for a staging server that serves multiple
        // jobs. Such a server would also use the ID sent with each request.
 
-       return &pb.CommitManifestResponse{RetrievalToken: 
gcsx.MakeObject(s.bucket, s.manifest)}, nil
+       return &jobpb.CommitManifestResponse{RetrievalToken: 
gcsx.MakeObject(s.bucket, s.manifest)}, nil
 }
 
 // matchLocations ensures that all artifacts have been staged and have valid
 // content. It is fine for staged artifacts to not appear in the manifest.
-func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) 
([]*pb.ProxyManifest_Location, error) {
-       var loc []*pb.ProxyManifest_Location
+func matchLocations(artifacts []*jobpb.ArtifactMetadata, blobs 
map[string]staged) ([]*jobpb.ProxyManifest_Location, error) {
+       var loc []*jobpb.ProxyManifest_Location
        for _, a := range artifacts {
                info, ok := blobs[a.Name]
                if !ok {
@@ -112,13 +112,13 @@ func matchLocations(artifacts []*pb.ArtifactMetadata, 
blobs map[string]staged) (
                        return nil, errors.Errorf("staged artifact for %v has 
invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
                }
 
-               loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: 
info.object})
+               loc = append(loc, &jobpb.ProxyManifest_Location{Name: a.Name, 
Uri: info.object})
        }
        return loc, nil
 }
 
 // PutArtifact stores the given artifact in GCS.
-func (s *StagingServer) PutArtifact(ps 
pb.LegacyArtifactStagingService_PutArtifactServer) error {
+func (s *StagingServer) PutArtifact(ps 
jobpb.LegacyArtifactStagingService_PutArtifactServer) error {
        // Read header
 
        header, err := ps.Recv()
@@ -153,7 +153,7 @@ func (s *StagingServer) PutArtifact(ps 
pb.LegacyArtifactStagingService_PutArtifa
        s.blobs[md.Name] = staged{object: gcsx.MakeObject(s.bucket, object), 
hash: hash}
        s.mu.Unlock()
 
-       return ps.SendAndClose(&pb.PutArtifactResponse{})
+       return ps.SendAndClose(&jobpb.PutArtifactResponse{})
 }
 
 // reader is an adapter between the artifact stream and the GCS stream reader.
@@ -161,7 +161,7 @@ func (s *StagingServer) PutArtifact(ps 
pb.LegacyArtifactStagingService_PutArtifa
 type reader struct {
        sha256W hash.Hash
        buf     []byte
-       stream  pb.LegacyArtifactStagingService_PutArtifactServer
+       stream  jobpb.LegacyArtifactStagingService_PutArtifactServer
 }
 
 func (r *reader) Read(buf []byte) (int, error) {

Reply via email to