This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch lostluck-protosuffix
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/lostluck-protosuffix by this 
push:
     new e1a2730  Update statemgr.go
e1a2730 is described below

commit e1a2730a3adc573dec73f2bc6e0cbecc3db524c8
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue Apr 7 22:38:30 2020 -0700

    Update statemgr.go
---
 sdks/go/pkg/beam/core/runtime/harness/statemgr.go | 46 +++++++++++------------
 1 file changed, 23 insertions(+), 23 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
index 4c0e7f5..3ba9d27 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
-       pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/proto"
 )
 
@@ -104,7 +104,7 @@ func (s *ScopedStateReader) Close() error {
 
 type stateKeyReader struct {
        instID instructionID
-       key    *pb.StateKey
+       key    *fnpb.StateKey
 
        token []byte
        buf   []byte
@@ -116,9 +116,9 @@ type stateKeyReader struct {
 }
 
 func newSideInputReader(ch *StateChannel, id exec.StreamID, sideInputID 
string, instID instructionID, k, w []byte) *stateKeyReader {
-       key := &pb.StateKey{
-               Type: &pb.StateKey_MultimapSideInput_{
-                       MultimapSideInput: &pb.StateKey_MultimapSideInput{
+       key := &fnpb.StateKey{
+               Type: &fnpb.StateKey_MultimapSideInput_{
+                       MultimapSideInput: &fnpb.StateKey_MultimapSideInput{
                                TransformId: id.PtransformID,
                                SideInputId: sideInputID,
                                Window:      w,
@@ -134,9 +134,9 @@ func newSideInputReader(ch *StateChannel, id exec.StreamID, 
sideInputID string,
 }
 
 func newRunnerReader(ch *StateChannel, instID instructionID, k []byte) 
*stateKeyReader {
-       key := &pb.StateKey{
-               Type: &pb.StateKey_Runner_{
-                       Runner: &pb.StateKey_Runner{
+       key := &fnpb.StateKey{
+               Type: &fnpb.StateKey_Runner_{
+                       Runner: &fnpb.StateKey_Runner{
                                Key: k,
                        },
                },
@@ -164,12 +164,12 @@ func (r *stateKeyReader) Read(buf []byte) (int, error) {
                localChannel := r.ch
                r.mu.Unlock()
 
-               req := &pb.StateRequest{
+               req := &fnpb.StateRequest{
                        // Id: set by StateChannel
                        InstructionId: string(r.instID),
                        StateKey:      r.key,
-                       Request: &pb.StateRequest_Get{
-                               Get: &pb.StateGetRequest{
+                       Request: &fnpb.StateRequest_Get{
+                               Get: &fnpb.StateGetRequest{
                                        ContinuationToken: r.token,
                                },
                        },
@@ -243,8 +243,8 @@ func (m *StateChannelManager) Open(ctx context.Context, 
port exec.Port) (*StateC
 }
 
 type stateClient interface {
-       Send(*pb.StateRequest) error
-       Recv() (*pb.StateResponse, error)
+       Send(*fnpb.StateRequest) error
+       Recv() (*fnpb.StateResponse, error)
 }
 
 // StateChannel manages state transactions over a single gRPC connection.
@@ -254,10 +254,10 @@ type StateChannel struct {
        id     string
        client stateClient
 
-       requests      chan *pb.StateRequest
+       requests      chan *fnpb.StateRequest
        nextRequestNo int32
 
-       responses map[string]chan<- *pb.StateResponse
+       responses map[string]chan<- *fnpb.StateResponse
        mu        sync.Mutex
 
        // a closure that forces the state manager to recreate this stream.
@@ -285,7 +285,7 @@ func newStateChannel(ctx context.Context, port exec.Port) 
(*StateChannel, error)
        if err != nil {
                return nil, errors.Wrapf(err, "failed to connect to state 
service %v", port.URL)
        }
-       client, err := pb.NewBeamFnStateClient(cc).State(ctx)
+       client, err := fnpb.NewBeamFnStateClient(cc).State(ctx)
        if err != nil {
                cc.Close()
                return nil, errors.Wrapf(err, "failed to create state client 
%v", port.URL)
@@ -297,8 +297,8 @@ func makeStateChannel(ctx context.Context, cancelFn 
context.CancelFunc, id strin
        ret := &StateChannel{
                id:        id,
                client:    client,
-               requests:  make(chan *pb.StateRequest, 10),
-               responses: make(map[string]chan<- *pb.StateResponse),
+               requests:  make(chan *fnpb.StateRequest, 10),
+               responses: make(map[string]chan<- *fnpb.StateResponse),
                cancelFn:  cancelFn,
                DoneCh:    ctx.Done(),
        }
@@ -346,7 +346,7 @@ func (c *StateChannel) write(ctx context.Context) {
        var err error
        var id string
        for {
-               var req *pb.StateRequest
+               var req *fnpb.StateRequest
                select {
                case req = <-c.requests:
                case <-c.DoneCh: // Close the goroutine on context cancel.
@@ -380,16 +380,16 @@ func (c *StateChannel) write(ctx context.Context) {
        c.terminateStreamOnError(err)
 
        if ok {
-               ch <- &pb.StateResponse{Id: id, Error: 
fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
+               ch <- &fnpb.StateResponse{Id: id, Error: 
fmt.Sprintf("StateChannel[%v].write failed to send: %v", c.id, err)}
        }
 }
 
 // Send sends a state request and returns the response.
-func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
+func (c *StateChannel) Send(req *fnpb.StateRequest) (*fnpb.StateResponse, 
error) {
        id := fmt.Sprintf("r%v", atomic.AddInt32(&c.nextRequestNo, 1))
        req.Id = id
 
-       ch := make(chan *pb.StateResponse, 1)
+       ch := make(chan *fnpb.StateResponse, 1)
        c.mu.Lock()
        if c.closedErr != nil {
                defer c.mu.Unlock()
@@ -400,7 +400,7 @@ func (c *StateChannel) Send(req *pb.StateRequest) 
(*pb.StateResponse, error) {
 
        c.requests <- req
 
-       var resp *pb.StateResponse
+       var resp *fnpb.StateResponse
        select {
        case resp = <-ch:
        case <-c.DoneCh:

Reply via email to