lostluck commented on a change in pull request #11490:
URL: https://github.com/apache/beam/pull/11490#discussion_r412599227



##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
        "context"
+       "io"
+       "os"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/artifact"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files 
...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) 
(retrievalToken string, err error) {
        ctx = grpcx.WriteWorkerID(ctx, id)
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
                return "", errors.WithContext(err, "connecting to artifact 
service")
        }
        defer cc.Close()
 
+       err = StageViaPorableApi(ctx, cc, binary, st)
+
+       if err == nil {
+               return "", err
+       } else {
+               return StageViaLegacyApi(ctx, cc, binary, st)
+       }
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st 
string) error {

Review comment:
       Typo: Portable

##########
File path: sdks/go/pkg/beam/core/runtime/graphx/translate.go
##########
@@ -93,14 +96,32 @@ func CreateEnvironment(ctx context.Context, urn string, 
extractEnvironmentConfig
                        panic(fmt.Sprintf(
                                "Failed to serialize Environment payload %v for 
config %v: %v", payload, config, err))
                }
+
                return &pipepb.Environment{
                        Urn:          urn,
                        Payload:      serializedPayload,
                        Capabilities: goCapabilities(),
+                       Dependencies: []*pipepb.ArtifactInformation{
+                               &pipepb.ArtifactInformation{
+                                       TypeUrn: URNArtifactGoWorker,
+                                       RoleUrn: URNArtifactStagingTo,
+                                       RolePayload: 
MarshalOrPanic(&pipepb.ArtifactStagingToRolePayload{
+                                               StagedName: "worker",
+                                       }),
+                               },
+                       },
                }
        }
 }
 
+func MarshalOrPanic(msg proto.Message) []byte {

Review comment:
       Conventionally this should be named MustMarshal if it's going to panic.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
        "context"
+       "io"
+       "os"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/artifact"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files 
...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) 
(retrievalToken string, err error) {
        ctx = grpcx.WriteWorkerID(ctx, id)
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
                return "", errors.WithContext(err, "connecting to artifact 
service")
        }
        defer cc.Close()
 
+       err = StageViaPorableApi(ctx, cc, binary, st)
+
+       if err == nil {
+               return "", err
+       } else {
+               return StageViaLegacyApi(ctx, cc, binary, st)
+       }
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st 
string) error {
+       client := jobpb.NewArtifactStagingServiceClient(cc)
+
+       stream, err := 
client.ReverseArtifactRetrievalService(context.Background())
+       if err != nil {
+               return err
+       }
+
+       if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: 
st}); err != nil {
+               return err
+       }
+
+       for {
+               in, err := stream.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+
+               switch request := in.Request.(type) {
+               case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+                       err = stream.Send(&jobpb.ArtifactResponseWrapper{
+                               Response: 
&jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+                                       &jobpb.ResolveArtifactsResponse{
+                                               Replacements: 
request.ResolveArtifact.Artifacts,
+                                       },
+                               }})
+                       if err != nil {
+                               return err
+                       }
+
+               case *jobpb.ArtifactRequestWrapper_GetArtifact:
+                       TypeUrn := request.GetArtifact.Artifact.TypeUrn

Review comment:
       While not incorrect, it's a bit odd to see a capitalized variable name 
in Go because a capital letter indicates the identifier is Exported from the 
package. 
   
   From a readability standpoint the capital initial tells the reader that the 
scope of this variable extends beyond this package, which isn't the case for a 
local variable in function scope. At the definition line (here) this is fine, 
but at later uses, it would be easy to make the mistake, miamatchign 
assumptions.
   
   MixedCaps are correct though.
   
   Consider typeUrn instead.
   
   As a second note: you can put the definition inline in the switch header
   
   switch typeUrn := ... ; typeUrn {
   case ...
   
   which limits its scope to just the switch.

##########
File path: sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
##########
@@ -17,27 +17,128 @@ package runnerlib
 
 import (
        "context"
+       "io"
+       "os"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/artifact"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        jobpb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
+       "google.golang.org/grpc"
 )
 
 // Stage stages the worker binary and any additional files to the given
 // artifact staging endpoint. It returns the retrieval token if successful.
-func Stage(ctx context.Context, id, endpoint, binary, st string, files 
...artifact.KeyedFile) (retrievalToken string, err error) {
+func Stage(ctx context.Context, id, endpoint, binary, st string) 
(retrievalToken string, err error) {
        ctx = grpcx.WriteWorkerID(ctx, id)
        cc, err := grpcx.Dial(ctx, endpoint, 2*time.Minute)
        if err != nil {
                return "", errors.WithContext(err, "connecting to artifact 
service")
        }
        defer cc.Close()
 
+       err = StageViaPorableApi(ctx, cc, binary, st)
+
+       if err == nil {
+               return "", err
+       } else {
+               return StageViaLegacyApi(ctx, cc, binary, st)
+       }
+}
+
+func StageViaPorableApi(ctx context.Context, cc *grpc.ClientConn, binary, st 
string) error {
+       client := jobpb.NewArtifactStagingServiceClient(cc)
+
+       stream, err := 
client.ReverseArtifactRetrievalService(context.Background())
+       if err != nil {
+               return err
+       }
+
+       if err := stream.Send(&jobpb.ArtifactResponseWrapper{StagingToken: 
st}); err != nil {
+               return err
+       }
+
+       for {
+               in, err := stream.Recv()
+               if err == io.EOF {
+                       return nil
+               }
+               if err != nil {
+                       return err
+               }
+
+               switch request := in.Request.(type) {
+               case *jobpb.ArtifactRequestWrapper_ResolveArtifact:
+                       err = stream.Send(&jobpb.ArtifactResponseWrapper{
+                               Response: 
&jobpb.ArtifactResponseWrapper_ResolveArtifactResponse{
+                                       &jobpb.ResolveArtifactsResponse{
+                                               Replacements: 
request.ResolveArtifact.Artifacts,
+                                       },
+                               }})
+                       if err != nil {
+                               return err
+                       }
+
+               case *jobpb.ArtifactRequestWrapper_GetArtifact:
+                       TypeUrn := request.GetArtifact.Artifact.TypeUrn
+                       switch TypeUrn {
+                       case graphx.URNArtifactGoWorker:
+                               StageFile(binary, stream)
+
+                       default:
+                               return errors.Errorf("Request has unexpected 
artifact type %s", TypeUrn)
+                       }
+
+               default:
+                       return errors.Errorf("Request has unexpected type %T", 
request)

Review comment:
       Lint Nit: due to how errors are used in go, via prepending context, it's 
preferred to not capitalize error strings, so they end up reading better with 
the prepended context.
   
   Here and everywhere else Errorf is used.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to