This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3f92f32e88686a6e8e9b78fc0a8509322d2c4fab
Author: Robert Burke <[email protected]>
AuthorDate: Tue Apr 7 21:59:24 2020 -0700

    Update session.go
    
    Make controlServer actually implement the control server interface.
    Move the import name to match shorter convention.
---
 sdks/go/pkg/beam/runners/session/session.go | 30 ++++++++++++++++-------------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/session/session.go 
b/sdks/go/pkg/beam/runners/session/session.go
index d814882..f917bc0 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -34,8 +34,8 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
-       fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-       rapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
        "github.com/golang/protobuf/proto"
        "google.golang.org/grpc"
 )
@@ -56,13 +56,13 @@ var sessionFile = flag.String("session_file", "", "Session 
file for the runner")
 type controlServer struct {
        filename   string
        wg         *sync.WaitGroup // used to signal when the session is 
completed
-       ctrlStream fnapi_pb.BeamFnControl_ControlServer
+       ctrlStream fnpb.BeamFnControl_ControlServer
        dataServer *grpc.Server
-       dataStream fnapi_pb.BeamFnData_DataServer
+       dataStream fnapb.BeamFnData_DataServer
        dwg        *sync.WaitGroup
 }
 
-func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) 
error {
+func (c *controlServer) Control(stream fnpb.BeamFnControl_ControlServer) error 
{
        fmt.Println("Go SDK connected")
        c.ctrlStream = stream
        // We have a connected worker. Start reading the session file and 
issuing
@@ -74,6 +74,10 @@ func (c *controlServer) Control(stream 
fnapi_pb.BeamFnControl_ControlServer) err
        return nil
 }
 
+func (c *controlServer) GetProcessBundleDescriptor(ctx context.Context, r 
*fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) 
{
+       return nil, nil
+}
+
 func (c *controlServer) establishDataChannel(beamPort, tcpPort string) {
        if c.dataServer != nil {
                // Already a data server, we're done
@@ -87,7 +91,7 @@ func (c *controlServer) establishDataChannel(beamPort, 
tcpPort string) {
        // the data server is listening on.
 
        c.dataServer = grpc.NewServer()
-       fnapi_pb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
+       fnpb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c})
        dp, err := net.Listen("tcp", tcpPort)
        if err != nil {
                panic(err)
@@ -97,7 +101,7 @@ func (c *controlServer) establishDataChannel(beamPort, 
tcpPort string) {
        go c.dataServer.Serve(dp)
 }
 
-func (c *controlServer) registerStream(stream fnapi_pb.BeamFnData_DataServer) {
+func (c *controlServer) registerStream(stream fnpb.BeamFnData_DataServer) {
        c.dataStream = stream
        c.dwg.Done()
 }
@@ -194,8 +198,8 @@ func (c *controlServer) handleEntry(msg *session.Entry) {
        }
 }
 
-func extractPortSpec(spec *rapi_pb.FunctionSpec) string {
-       var port fnapi_pb.RemoteGrpcPort
+func extractPortSpec(spec *pipepb.FunctionSpec) string {
+       var port fnpb.RemoteGrpcPort
        if err := proto.Unmarshal(spec.GetPayload(), &port); err != nil {
                panic(err)
        }
@@ -213,7 +217,7 @@ type dataServer struct {
        ctrl *controlServer
 }
 
-func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error {
+func (d *dataServer) Data(stream fnpb.BeamFnData_DataServer) error {
        // This goroutine is only used for reading data. The stream object
        // is passed to the control server so that all data is sent from
        // a single goroutine to ensure proper ordering.
@@ -239,7 +243,7 @@ func (d *dataServer) Data(stream 
fnapi_pb.BeamFnData_DataServer) error {
 // loggingServer manages the FnAPI logging channel.
 type loggingServer struct{} // no data content
 
-func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) 
error {
+func (l *loggingServer) Logging(stream fnpb.BeamFnLogging_LoggingServer) error 
{
        // This stream object is only used here. The stream is used for 
receiving, and
        // no sends happen on it.
        for {
@@ -269,7 +273,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
 
        // Start up the grpc logging service.
        ls := grpc.NewServer()
-       fnapi_pb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
+       fnpb.RegisterBeamFnLoggingServer(ls, &loggingServer{})
        logPort, err := net.Listen("tcp", ":0")
        if err != nil {
                panic("No logging port")
@@ -282,7 +286,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        wg.Add(1)
 
        cs := grpc.NewServer()
-       fnapi_pb.RegisterBeamFnControlServer(cs, &controlServer{
+       fnpb.RegisterBeamFnControlServer(cs, &controlServer{
                filename: *sessionFile,
                wg:       &wg,
        })

Reply via email to