This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch lostluck-protosuffix in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3f92f32e88686a6e8e9b78fc0a8509322d2c4fab Author: Robert Burke <[email protected]> AuthorDate: Tue Apr 7 21:59:24 2020 -0700 Update session.go Make controlServer actually implement the control server interface. Move the import name to match shorter convention. --- sdks/go/pkg/beam/runners/session/session.go | 30 ++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go index d814882..f917bc0 100644 --- a/sdks/go/pkg/beam/runners/session/session.go +++ b/sdks/go/pkg/beam/runners/session/session.go @@ -34,8 +34,8 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" - fnapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" - rapi_pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" + pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1" "github.com/golang/protobuf/proto" "google.golang.org/grpc" ) @@ -56,13 +56,13 @@ var sessionFile = flag.String("session_file", "", "Session file for the runner") type controlServer struct { filename string wg *sync.WaitGroup // used to signal when the session is completed - ctrlStream fnapi_pb.BeamFnControl_ControlServer + ctrlStream fnpb.BeamFnControl_ControlServer dataServer *grpc.Server - dataStream fnapi_pb.BeamFnData_DataServer + dataStream fnapb.BeamFnData_DataServer dwg *sync.WaitGroup } -func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) error { +func (c *controlServer) Control(stream fnpb.BeamFnControl_ControlServer) error { fmt.Println("Go SDK connected") c.ctrlStream = stream // We have a connected worker. Start reading the session file and issuing @@ -74,6 +74,10 @@ func (c *controlServer) Control(stream fnapi_pb.BeamFnControl_ControlServer) err return nil } +func (c *controlServer) GetProcessBundleDescriptor(ctx context.Context, r *fnpb.GetProcessBundleDescriptorRequest) (*fnpb.ProcessBundleDescriptor, error) { + return nil, nil +} + func (c *controlServer) establishDataChannel(beamPort, tcpPort string) { if c.dataServer != nil { // Already a data server, we're done @@ -87,7 +91,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) { // the data server is listening on. c.dataServer = grpc.NewServer() - fnapi_pb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c}) + fnpb.RegisterBeamFnDataServer(c.dataServer, &dataServer{ctrl: c}) dp, err := net.Listen("tcp", tcpPort) if err != nil { panic(err) @@ -97,7 +101,7 @@ func (c *controlServer) establishDataChannel(beamPort, tcpPort string) { go c.dataServer.Serve(dp) } -func (c *controlServer) registerStream(stream fnapi_pb.BeamFnData_DataServer) { +func (c *controlServer) registerStream(stream fnpb.BeamFnData_DataServer) { c.dataStream = stream c.dwg.Done() } @@ -194,8 +198,8 @@ func (c *controlServer) handleEntry(msg *session.Entry) { } } -func extractPortSpec(spec *rapi_pb.FunctionSpec) string { - var port fnapi_pb.RemoteGrpcPort +func extractPortSpec(spec *pipepb.FunctionSpec) string { + var port fnpb.RemoteGrpcPort if err := proto.Unmarshal(spec.GetPayload(), &port); err != nil { panic(err) } @@ -213,7 +217,7 @@ type dataServer struct { ctrl *controlServer } -func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error { +func (d *dataServer) Data(stream fnpb.BeamFnData_DataServer) error { // This goroutine is only used for reading data. The stream object // is passed to the control server so that all data is sent from // a single goroutine to ensure proper ordering. @@ -239,7 +243,7 @@ func (d *dataServer) Data(stream fnapi_pb.BeamFnData_DataServer) error { // loggingServer manages the FnAPI logging channel. type loggingServer struct{} // no data content -func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) error { +func (l *loggingServer) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { // This stream object is only used here. The stream is used for receiving, and // no sends happen on it. for { @@ -269,7 +273,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { // Start up the grpc logging service. ls := grpc.NewServer() - fnapi_pb.RegisterBeamFnLoggingServer(ls, &loggingServer{}) + fnpb.RegisterBeamFnLoggingServer(ls, &loggingServer{}) logPort, err := net.Listen("tcp", ":0") if err != nil { panic("No logging port") @@ -282,7 +286,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { wg.Add(1) cs := grpc.NewServer() - fnapi_pb.RegisterBeamFnControlServer(cs, &controlServer{ + fnpb.RegisterBeamFnControlServer(cs, &controlServer{ filename: *sessionFile, wg: &wg, })
