This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch lostluck-protosuffix in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/lostluck-protosuffix by this push: new e1a2730 Update statemgr.go e1a2730 is described below commit e1a2730a3adc573dec73f2bc6e0cbecc3db524c8 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Tue Apr 7 22:38:30 2020 -0700 Update statemgr.go --- sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 46 +++++++++++------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go index 4c0e7f5..3ba9d27 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go @@ -26,7 +26,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec" "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/go/pkg/beam/log" - pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" + fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1" "github.com/golang/protobuf/proto" ) @@ -104,7 +104,7 @@ func (s *ScopedStateReader) Close() error { type stateKeyReader struct { instID instructionID - key *pb.StateKey + key *fnpb.StateKey token []byte buf []byte @@ -116,9 +116,9 @@ type stateKeyReader struct { } func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string, instID instructionID, k, w []byte) *stateKeyReader { - key := &pb.StateKey{ - Type: &pb.StateKey_MultimapSideInput_{ - MultimapSideInput: &pb.StateKey_MultimapSideInput{ + key := &fnpb.StateKey{ + Type: &fnpb.StateKey_MultimapSideInput_{ + MultimapSideInput: &fnpb.StateKey_MultimapSideInput{ TransformId: id.PtransformID, SideInputId: sideInputID, Window: w, @@ -134,9 +134,9 @@ func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID string, } func newRunnerReader(ch *StateChannel, instID instructionID, k []byte) *stateKeyReader { - key := &pb.StateKey{ - Type: &pb.StateKey_Runner_{ - Runner: &pb.StateKey_Runner{ + key := &fnpb.StateKey{ + Type: &fnpb.StateKey_Runner_{ + Runner: &fnpb.StateKey_Runner{ Key: k, }, }, @@ -164,12 +164,12 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) { localChannel := r.ch r.mu.Unlock() - req := &pb.StateRequest{ + req := &fnpb.StateRequest{ // Id: set by StateChannel InstructionId: string(r.instID), StateKey: r.key, - Request: &pb.StateRequest_Get{ - Get: &pb.StateGetRequest{ + Request: &fnpb.StateRequest_Get{ + Get: &fnpb.StateGetRequest{ ContinuationToken: r.token, }, }, @@ -243,8 +243,8 @@ func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) (*StateC } type stateClient interface { - Send(*pb.StateRequest) error - Recv() (*pb.StateResponse, error) + Send(*fnpb.StateRequest) error + Recv() (*fnpb.StateResponse, error) } // StateChannel manages state transactions over a single gRPC connection. @@ -254,10 +254,10 @@ type StateChannel struct { id string client stateClient - requests chan *pb.StateRequest + requests chan *fnpb.StateRequest nextRequestNo int32 - responses map[string]chan<- *pb.StateResponse + responses map[string]chan<- *fnpb.StateResponse mu sync.Mutex // a closure that forces the state manager to recreate this stream. @@ -285,7 +285,7 @@ func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, error) if err != nil { return nil, errors.Wrapf(err, "failed to connect to state service %v", port.URL) } - client, err := pb.NewBeamFnStateClient(cc).State(ctx) + client, err := fnpb.NewBeamFnStateClient(cc).State(ctx) if err != nil { cc.Close() return nil, errors.Wrapf(err, "failed to create state client %v", port.URL) @@ -297,8 +297,8 @@ func makeStateChannel(ctx context.Context, cancelFn context.CancelFunc, id strin ret := &StateChannel{ id: id, client: client, - requests: make(chan *pb.StateRequest, 10), - responses: make(map[string]chan<- *pb.StateResponse), + requests: make(chan *fnpb.StateRequest, 10), + responses: make(map[string]chan<- *fnpb.StateResponse), cancelFn: cancelFn, DoneCh: ctx.Done(), } @@ -346,7 +346,7 @@ func (c *StateChannel) write(ctx context.Context) { var err error var id string for { - var req *pb.StateRequest + var req *fnpb.StateRequest select { case req = <-c.requests: case <-c.DoneCh: // Close the goroutine on context cancel. @@ -380,16 +380,16 @@ func (c *StateChannel) write(ctx context.Context) { c.terminateStreamOnError(err) if ok { - ch <- &pb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)} + ch <- &fnpb.StateResponse{Id: id, Error: fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)} } } // Send sends a state request and returns the response. -func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) { +func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, error) { id := fmt.Sprintf("r%v", atomic.AddInt32(&c.nextRequestNo, 1)) req.Id = id - ch := make(chan *pb.StateResponse, 1) + ch := make(chan *fnpb.StateResponse, 1) c.mu.Lock() if c.closedErr != nil { defer c.mu.Unlock() @@ -400,7 +400,7 @@ func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) { c.requests <- req - var resp *pb.StateResponse + var resp *fnpb.StateResponse select { case resp = <-ch: case <-c.DoneCh: