This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 75eb0b1431c Go stateful DoFns user side changes (#22761)
75eb0b1431c is described below

commit 75eb0b1431c84c98f2e16a9f535b0e11b0160d43
Author: Danny McCormick <[email protected]>
AuthorDate: Thu Aug 18 08:07:30 2022 -0700

    Go stateful DoFns user side changes (#22761)
    
    * Go stateful DoFns user side changes
    
    * Fix static check violation
    
    * Small cleanup
    
    * Doc comments
---
 sdks/go/pkg/beam/core/funcx/fn.go                 |  45 +++-
 sdks/go/pkg/beam/core/funcx/fn_test.go            |  16 ++
 sdks/go/pkg/beam/core/graph/edge.go               |  17 +-
 sdks/go/pkg/beam/core/graph/fn.go                 |  75 +++++-
 sdks/go/pkg/beam/core/graph/fn_test.go            |  89 +++++--
 sdks/go/pkg/beam/core/runtime/exec/fn.go          |  32 ++-
 sdks/go/pkg/beam/core/runtime/exec/translate.go   |  16 ++
 sdks/go/pkg/beam/core/runtime/graphx/serialize.go |   5 +
 sdks/go/pkg/beam/core/runtime/graphx/translate.go |  24 ++
 sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go  | 300 +++++++++++-----------
 sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto  |   2 +
 sdks/go/pkg/beam/core/state/state.go              | 120 +++++++++
 sdks/go/pkg/beam/core/state/state_test.go         | 140 ++++++++++
 sdks/go/pkg/beam/pardo.go                         |  13 +
 14 files changed, 715 insertions(+), 179 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go 
b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..788d6d52df6 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -20,6 +20,7 @@ import (
        "reflect"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -82,6 +83,8 @@ const (
        FnBundleFinalization FnParamKind = 0x800
        // FnWatermarkEstimator indicates a function input parameter that 
implements sdf.WatermarkEstimator
        FnWatermarkEstimator FnParamKind = 0x1000
+       // FnState indicates a function input parameter that implements 
state.Provider
+       FnStateProvider FnParamKind = 0x2000
 )
 
 func (k FnParamKind) String() string {
@@ -112,6 +115,8 @@ func (k FnParamKind) String() string {
                return "BundleFinalization"
        case FnWatermarkEstimator:
                return "WatermarkEstimator"
+       case FnStateProvider:
+               return "StateProvider"
        default:
                return fmt.Sprintf("%v", int(k))
        }
@@ -289,6 +294,17 @@ func (u *Fn) BundleFinalization() (pos int, exists bool) {
        return -1, false
 }
 
+// StateProvider returns (index, true) iff the function expects a
+// parameter that implements state.Provider.
+func (u *Fn) StateProvider() (pos int, exists bool) {
+       for i, p := range u.Param {
+               if p.Kind == FnStateProvider {
+                       return i, true
+               }
+       }
+       return -1, false
+}
+
 // WatermarkEstimator returns (index, true) iff the function expects a
 // parameter that implements sdf.WatermarkEstimator.
 func (u *Fn) WatermarkEstimator() (pos int, exists bool) {
@@ -374,6 +390,8 @@ func New(fn reflectx.Func) (*Fn, error) {
                        kind = FnWindow
                case t == typex.BundleFinalizationType:
                        kind = FnBundleFinalization
+               case t == state.ProviderType:
+                       kind = FnStateProvider
                case t == reflectx.Type:
                        kind = FnType
                case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
@@ -464,7 +482,7 @@ func SubReturns(list []ReturnParam, indices ...int) 
[]ReturnParam {
 }
 
 // The order of present parameters and return values must be as follows:
-// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, 
FnType?, FnBundleFinalization?, FnRTracker?, (FnValue, SideInput*)?, FnEmit*) 
(RetEventTime?, RetOutput?, RetError?)
+// func(FnContext?, FnPane?, FnWindow?, FnEventTime?, FnWatermarkEstimator?, 
FnType?, FnBundleFinalization?, FnRTracker?, FnStateProvider?, (FnValue, 
SideInput*)?, FnEmit*) (RetEventTime?, RetOutput?, RetError?)
 //     where ? indicates 0 or 1, and * indicates any number.
 //     and  a SideInput is one of FnValue or FnIter or FnReIter
 // Note: Fns with inputs must have at least one FnValue as the main input.
@@ -496,6 +514,7 @@ var (
        errReflectTypePrecedence             = errors.New("may only have a 
single reflect.Type parameter and it must precede the main input parameter")
        errRTrackerPrecedence                = errors.New("may only have a 
single sdf.RTracker parameter and it must precede the main input parameter")
        errBundleFinalizationPrecedence      = errors.New("may only have a 
single BundleFinalization parameter and it must precede the main input 
parameter")
+       errStateProviderPrecedence           = errors.New("may only have a 
single state.Provider parameter and it must precede the main input parameter")
        errInputPrecedence                   = errors.New("inputs parameters 
must precede emit function parameters")
 )
 
@@ -513,6 +532,7 @@ const (
        psOutput
        psRTracker
        psBundleFinalization
+       psStateProvider
 )
 
 func nextParamState(cur paramState, transition FnParamKind) (paramState, 
error) {
@@ -535,6 +555,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psContext:
                switch transition {
@@ -552,6 +574,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psPane:
                switch transition {
@@ -567,6 +591,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psWindow:
                switch transition {
@@ -580,6 +606,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psEventTime:
                switch transition {
@@ -591,6 +619,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psWatermarkEstimator:
                switch transition {
@@ -600,6 +630,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psType:
                switch transition {
@@ -607,13 +639,22 @@ func nextParamState(cur paramState, transition 
FnParamKind) (paramState, error)
                        return psBundleFinalization, nil
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psBundleFinalization:
                switch transition {
                case FnRTracker:
                        return psRTracker, nil
+               case FnStateProvider:
+                       return psStateProvider, nil
                }
        case psRTracker:
+               switch transition {
+               case FnStateProvider:
+                       return psStateProvider, nil
+               }
+       case psStateProvider:
                // Completely handled by the default clause
        case psInput:
                switch transition {
@@ -644,6 +685,8 @@ func nextParamState(cur paramState, transition FnParamKind) 
(paramState, error)
                return -1, errBundleFinalizationPrecedence
        case FnRTracker:
                return -1, errRTrackerPrecedence
+       case FnStateProvider:
+               return -1, errStateProviderPrecedence
        case FnIter, FnReIter, FnValue, FnMultiMap:
                return psInput, nil
        case FnEmit:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go 
b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..9eee359c1e5 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -108,6 +109,11 @@ func TestNew(t *testing.T) {
                        Fn:    func(typex.PaneInfo, typex.Window, 
typex.EventTime, sdf.WatermarkEstimator, reflect.Type, []byte) {},
                        Param: []FnParamKind{FnPane, FnWindow, FnEventTime, 
FnWatermarkEstimator, FnType, FnValue},
                },
+               {
+                       Name:  "good10",
+                       Fn:    func(sdf.RTracker, state.Provider, []byte) {},
+                       Param: []FnParamKind{FnRTracker, FnStateProvider, 
FnValue},
+               },
                {
                        Name:  "good-method",
                        Fn:    foo{1}.Do,
@@ -211,6 +217,16 @@ func TestNew(t *testing.T) {
                        Fn:   func(int, func(int), func() func(*int) bool) {},
                        Err:  errInputPrecedence,
                },
+               {
+                       Name: "errInputPrecedence- StateProvider before 
RTracker",
+                       Fn:   func(state.Provider, sdf.RTracker, int) {},
+                       Err:  errRTrackerPrecedence,
+               },
+               {
+                       Name: "errInputPrecedence- StateProvider after output",
+                       Fn:   func(int, state.Provider) {},
+                       Err:  errStateProviderPrecedence,
+               },
                {
                        Name: "errInputPrecedence- input after output",
                        Fn:   func(int, func(int), int) {},
diff --git a/sdks/go/pkg/beam/core/graph/edge.go 
b/sdks/go/pkg/beam/core/graph/edge.go
index 5e18e7559ec..a9f1c8a092b 100644
--- a/sdks/go/pkg/beam/core/graph/edge.go
+++ b/sdks/go/pkg/beam/core/graph/edge.go
@@ -153,14 +153,15 @@ type MultiEdge struct {
        parent *Scope
 
        Op               Opcode
-       DoFn             *DoFn              // ParDo
-       RestrictionCoder *coder.Coder       // SplittableParDo
-       CombineFn        *CombineFn         // Combine
-       AccumCoder       *coder.Coder       // Combine
-       Value            []byte             // Impulse
-       External         *ExternalTransform // Current External Transforms API
-       Payload          *Payload           // Legacy External Transforms API
-       WindowFn         *window.Fn         // WindowInto
+       DoFn             *DoFn                   // ParDo
+       RestrictionCoder *coder.Coder            // SplittableParDo
+       StateCoders      map[string]*coder.Coder // Stateful ParDo
+       CombineFn        *CombineFn              // Combine
+       AccumCoder       *coder.Coder            // Combine
+       Value            []byte                  // Impulse
+       External         *ExternalTransform      // Current External Transforms 
API
+       Payload          *Payload                // Legacy External Transforms 
API
+       WindowFn         *window.Fn              // WindowInto
 
        Input  []*Inbound
        Output []*Outbound
diff --git a/sdks/go/pkg/beam/core/graph/fn.go 
b/sdks/go/pkg/beam/core/graph/fn.go
index 6b3656c1c25..2034349cdf2 100644
--- a/sdks/go/pkg/beam/core/graph/fn.go
+++ b/sdks/go/pkg/beam/core/graph/fn.go
@@ -21,6 +21,7 @@ import (
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/funcx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -282,6 +283,27 @@ func (f *DoFn) IsSplittable() bool {
        return ok
 }
 
+// PipelineState returns a list of PipelineState objects used to access/mutate 
global pipeline state (if any).
+func (f *DoFn) PipelineState() []state.PipelineState {
+       var s []state.PipelineState
+       if f.Recv == nil {
+               return s
+       }
+
+       v := reflect.Indirect(reflect.ValueOf(f.Recv))
+
+       for i := 0; i < v.NumField(); i++ {
+               f := v.Field(i)
+               if f.CanInterface() {
+                       if ps, ok := f.Interface().(state.PipelineState); ok {
+                               s = append(s, ps)
+                       }
+               }
+       }
+
+       return s
+}
+
 // SplittableDoFn represents a DoFn implementing SDF methods.
 type SplittableDoFn DoFn
 
@@ -561,7 +583,14 @@ func AsDoFn(fn *Fn, numMainIn mainInputs) (*DoFn, error) {
                }
        }
 
-       return (*DoFn)(fn), nil
+       doFn := (*DoFn)(fn)
+
+       err = validateState(doFn, numMainIn)
+       if err != nil {
+               return nil, addContext(err, fn)
+       }
+
+       return doFn, nil
 }
 
 // validateMainInputs checks that a method has the given number of main inputs
@@ -1221,6 +1250,50 @@ func validateStatefulWatermarkSig(fn *Fn, numMainIn int) 
error {
        return nil
 }
 
+func validateState(fn *DoFn, numIn mainInputs) error {
+       ps := fn.PipelineState()
+
+       if _, hasSp := fn.methods[processElementName].StateProvider(); hasSp {
+               if numIn == MainSingle {
+                       err := errors.Errorf("ProcessElement uses a 
StateProvider, but is not keyed")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a StateProvider, but is not keyed. "+
+                               "All stateful DoFns must take a key/value pair 
as an input.")
+               }
+               if len(ps) == 0 {
+                       err := errors.Errorf("ProcessElement uses a 
StateProvider, but noState structs are attached to the DoFn")
+                       return errors.SetTopLevelMsgf(err, "ProcessElement uses 
a StateProvider, but no State structs are "+
+                               "attached to the DoFn. Ensure that you are 
including the State structs you're using to read/write"+
+                               "global state as public uppercase member 
variables.")
+               }
+               stateKeys := make(map[string]state.PipelineState)
+               for _, s := range ps {
+                       k := s.StateKey()
+                       if orig, ok := stateKeys[k]; ok {
+                               err := errors.Errorf("Duplicate state key %v", 
k)
+                               return errors.SetTopLevelMsgf(err, "Duplicate 
state key %v used by %v and %v. Ensure that state keys are"+
+                                       "unique per DoFn", k, orig, s)
+                       } else {
+                               stateKeys[k] = s
+                       }
+               }
+
+               // TODO(#22736) - Remove this once state is fully supported
+               err := errors.Errorf("ProcessElement uses a StateProvider, but 
state is not supported in this release.")
+               return errors.SetTopLevelMsgf(err, "ProcessElement uses a 
StateProvider, but state is not supported in this release. "+
+                       "Please try upgrading to a newer release if one exists 
or wait for state support to be released.")
+       } else {
+               if len(ps) > 0 {
+                       err := errors.Errorf("ProcessElement doesn't use a 
StateProvider, but State structs are attached to "+
+                               "the DoFn: %v", ps)
+                       return errors.SetTopLevelMsgf(err, "ProcessElement 
doesn't use a StateProvider, but State structs are "+
+                               "attached to the DoFn: %v\nEnsure that you are 
using the StateProvider to perform any reads or writes"+
+                               "of pipeline state.", ps)
+               }
+       }
+
+       return nil
+}
+
 // CombineFn represents a CombineFn.
 type CombineFn Fn
 
diff --git a/sdks/go/pkg/beam/core/graph/fn_test.go 
b/sdks/go/pkg/beam/core/graph/fn_test.go
index a1702175f64..2117c735dce 100644
--- a/sdks/go/pkg/beam/core/graph/fn_test.go
+++ b/sdks/go/pkg/beam/core/graph/fn_test.go
@@ -25,6 +25,7 @@ import (
        "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -52,6 +53,8 @@ func TestNewDoFn(t *testing.T) {
                        {dfn: &GoodDoFnCoGbk2{}, opt: CoGBKMainInput(3)},
                        {dfn: &GoodDoFnCoGbk7{}, opt: CoGBKMainInput(8)},
                        {dfn: &GoodDoFnCoGbk1wSide{}, opt: 
NumMainInputs(MainKv)},
+                       // TODO(#22736) - Enable this once stateful dofns are 
fully supported
+                       // {dfn: &GoodStatefulDoFn{State1: 
state.Value[int](state.MakeValueState[int]("state1"))}, opt: 
NumMainInputs(MainKv)},
                }
 
                for _, test := range tests {
@@ -70,7 +73,8 @@ func TestNewDoFn(t *testing.T) {
        })
        t.Run("invalid", func(t *testing.T) {
                tests := []struct {
-                       dfn interface{}
+                       dfn       interface{}
+                       numInputs int
                }{
                        // Validate main inputs.
                        {dfn: func() int { return 0 }}, // No inputs.
@@ -94,26 +98,44 @@ func TestNewDoFn(t *testing.T) {
                        {dfn: &BadDoFnReturnValuesInFinishBundle{}},
                        {dfn: &BadDoFnReturnValuesInSetup{}},
                        {dfn: &BadDoFnReturnValuesInTeardown{}},
+                       {dfn: &BadStatefulDoFnNoStateProvider{State1: 
state.Value[int](state.MakeValueState[int]("state1"))}},
+                       {dfn: &BadStatefulDoFnNoStateFields{}},
+                       {dfn: &BadStatefulDoFnNoKV{State1: 
state.Value[int](state.MakeValueState[int]("state1"))}, numInputs: 1},
                }
                for _, test := range tests {
                        t.Run(reflect.TypeOf(test.dfn).String(), func(t 
*testing.T) {
-                               if cfn, err := NewDoFn(test.dfn); err != nil {
-                                       t.Logf("NewDoFn failed as 
expected:\n%v", err)
-                               } else {
-                                       t.Errorf("NewDoFn(%v) = %v, want 
failure", cfn.Name(), cfn)
-                               }
-                               // If validation fails with unknown main 
inputs, then it should
-                               // always fail for any known number of main 
inputs, so test them
-                               // all. Error messages won't necessarily match.
-                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
-                                       t.Logf("NewDoFn failed as 
expected:\n%v", err)
-                               } else {
-                                       t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
-                               }
-                               if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {
-                                       t.Logf("NewDoFn failed as 
expected:\n%v", err)
-                               } else {
-                                       t.Errorf("NewDoFn(%v, 
NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+                               switch test.numInputs {
+                               case 1:
+                                       if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
+                                               t.Logf("NewDoFn failed as 
expected:\n%v", err)
+                                       } else {
+                                               t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                                       }
+                               case 2:
+                                       if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {
+                                               t.Logf("NewDoFn failed as 
expected:\n%v", err)
+                                       } else {
+                                               t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                                       }
+                               default:
+                                       if cfn, err := NewDoFn(test.dfn); err 
!= nil {
+                                               t.Logf("NewDoFn failed as 
expected:\n%v", err)
+                                       } else {
+                                               t.Errorf("NewDoFn(%v) = %v, 
want failure", cfn.Name(), cfn)
+                                       }
+                                       // If validation fails with unknown 
main inputs, then it should
+                                       // always fail for any known number of 
main inputs, so test them
+                                       // all. Error messages won't 
necessarily match.
+                                       if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainSingle)); err != nil {
+                                               t.Logf("NewDoFn failed as 
expected:\n%v", err)
+                                       } else {
+                                               t.Errorf("NewDoFn(%v, 
NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+                                       }
+                                       if cfn, err := NewDoFn(test.dfn, 
NumMainInputs(MainKv)); err != nil {
+                                               t.Logf("NewDoFn failed as 
expected:\n%v", err)
+                                       } else {
+                                               t.Errorf("NewDoFn(%v, 
NumMainInputs(MainKv)) = %v, want failure", cfn.Name(), cfn)
+                                       }
                                }
                        })
                }
@@ -1058,6 +1080,14 @@ func (fn *GoodStatefulWatermarkEstimatingKv) 
WatermarkEstimatorState(estimator *
        return 0
 }
 
+type GoodStatefulDoFn struct {
+       State1 state.Value[int]
+}
+
+func (fn *GoodStatefulDoFn) ProcessElement(state.Provider, int, int) int {
+       return 0
+}
+
 // Examples of incorrect SDF signatures.
 // Examples with missing methods.
 
@@ -1422,6 +1452,29 @@ func (fn *BadManualWatermarkEstimatorMismatched) 
CreateWatermarkEstimator() *Wat
        return &WatermarkEstimatorT{}
 }
 
+type BadStatefulDoFnNoStateProvider struct {
+       State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoStateProvider) ProcessElement(int, int) int {
+       return 0
+}
+
+type BadStatefulDoFnNoStateFields struct {
+}
+
+func (fn *BadStatefulDoFnNoStateFields) ProcessElement(state.Provider, int) 
int {
+       return 0
+}
+
+type BadStatefulDoFnNoKV struct {
+       State1 state.Value[int]
+}
+
+func (fn *BadStatefulDoFnNoKV) ProcessElement(state.Provider, int, int) int {
+       return 0
+}
+
 // Examples of correct CombineFn signatures
 
 type MyAccum struct{}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 48387f78c34..51f3296ca44 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
 )
@@ -68,6 +69,21 @@ func (bf *bundleFinalizer) RegisterCallback(t time.Duration, 
cb func() error) {
        }
 }
 
+type stateProvider struct {
+}
+
+// ReadValueState reads a value state from the State API
+func (s *stateProvider) ReadValueState(userStateId string) (interface{}, 
[]state.Transaction, error) {
+       // TODO(#22736) - read from the state api.
+       return nil, nil, errors.New("Stateful DoFns are not supported yet.")
+}
+
+// WriteValueState writes a value state to the State API
+func (s *stateProvider) WriteValueState(val state.Transaction) error {
+       // TODO(#22736) - read from the state api.
+       return errors.New("Stateful DoFns are not supported yet.")
+}
+
 // Invoke invokes the fn with the given values. The extra values must match 
the non-main
 // side input and emitters. It returns the direct output, if any.
 func Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Window, ts 
typex.EventTime, fn *funcx.Fn, opt *MainInput, bf *bundleFinalizer, we 
sdf.WatermarkEstimator, extra ...interface{}) (*FullValue, error) {
@@ -92,10 +108,11 @@ func InvokeWithoutEventTime(ctx context.Context, fn 
*funcx.Fn, opt *MainInput, b
 type invoker struct {
        fn   *funcx.Fn
        args []interface{}
+       sp   *stateProvider
        // TODO(lostluck):  2018/07/06 consider replacing with a slice of 
functions to run over the args slice, as an improvement.
-       ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx int   // specialized input 
indexes
-       outEtIdx, outPcIdx, outErrIdx              int   // specialized output 
indexes
-       in, out                                    []int // general indexes
+       ctxIdx, pnIdx, wndIdx, etIdx, bfIdx, weIdx, spIdx int   // specialized 
input indexes
+       outEtIdx, outPcIdx, outErrIdx                     int   // specialized 
output indexes
+       in, out                                           []int // general 
indexes
 
        ret                     FullValue                     // ret is a 
cached allocation for passing to the next Unit. Units never modify the passed 
in FullValue.
        elmConvert, elm2Convert func(interface{}) interface{} // Cached 
conversion functions, which assums this invoker is always used with the same 
parameter types.
@@ -125,6 +142,9 @@ func newInvoker(fn *funcx.Fn) *invoker {
        if n.weIdx, ok = fn.WatermarkEstimator(); !ok {
                n.weIdx = -1
        }
+       if n.spIdx, ok = fn.StateProvider(); !ok {
+               n.spIdx = -1
+       }
        if n.outEtIdx, ok = fn.OutEventTime(); !ok {
                n.outEtIdx = -1
        }
@@ -188,6 +208,12 @@ func (n *invoker) Invoke(ctx context.Context, pn 
typex.PaneInfo, ws []typex.Wind
                args[n.weIdx] = we
        }
 
+       if n.spIdx >= 0 {
+               // TODO(#22736) - provide this with the variable access it 
needs to talk to the state api.
+               n.sp = &stateProvider{}
+               args[n.spIdx] = n.sp
+       }
+
        // (2) Main input from value, if any.
        i := 0
        if opt != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index e5b9976cb34..53ddf7843c2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -404,6 +404,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                urnTruncateSizedRestrictions:
                var data string
                var sides map[string]*pipepb.SideInput
+               var userState map[string]*pipepb.StateSpec
                switch urn {
                case graphx.URNParDo,
                        urnPairWithRestriction,
@@ -416,6 +417,7 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        }
                        data = string(pardo.GetDoFn().GetPayload())
                        sides = pardo.GetSideInputs()
+                       userState = pardo.GetStateSpecs()
                case urnPerKeyCombinePre, urnPerKeyCombineMerge, 
urnPerKeyCombineExtract, urnPerKeyCombineConvert:
                        var cmb pipepb.CombinePayload
                        if err := proto.Unmarshal(payload, &cmb); err != nil {
@@ -462,6 +464,20 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                        n.PID = transform.GetUniqueName()
 
                                        input := 
unmarshalKeyedValues(transform.GetInputs())
+
+                                       if len(userState) > 0 {
+                                               stateIdToCoder := 
make(map[string]*coder.Coder)
+                                               for key, spec := range 
userState {
+                                                       // TODO(#22736) - this 
will eventually need to be aware of which type of state its modifying to 
support non-Value state types.
+                                                       cID := 
spec.GetReadModifyWriteSpec().CoderId
+                                                       c, err := 
b.coders.Coder(cID)
+                                                       if err != nil {
+                                                               return nil, err
+                                                       }
+                                                       stateIdToCoder[key] = c
+                                               }
+                                       }
+
                                        for i := 1; i < len(input); i++ {
                                                // 
TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go 
b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
index 75ee58484fd..4fe1a9e1616 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/serialize.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
        v1pb "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/v1"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/jsonx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
@@ -517,6 +518,8 @@ func tryEncodeSpecial(t reflect.Type) (v1pb.Type_Special, 
bool) {
                return v1pb.Type_WINDOW, true
        case typex.BundleFinalizationType:
                return v1pb.Type_BUNDLEFINALIZATION, true
+       case state.ProviderType:
+               return v1pb.Type_STATEPROVIDER, true
        case typex.KVType:
                return v1pb.Type_KV, true
        case typex.CoGBKType:
@@ -681,6 +684,8 @@ func decodeSpecial(s v1pb.Type_Special) (reflect.Type, 
error) {
                return typex.WindowType, nil
        case v1pb.Type_BUNDLEFINALIZATION:
                return typex.BundleFinalizationType, nil
+       case v1pb.Type_STATEPROVIDER:
+               return state.ProviderType, nil
        case v1pb.Type_KV:
                return typex.KVType, nil
        case v1pb.Type_COGBK:
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 77797d756f1..653dacce7a3 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -70,6 +70,7 @@ const (
 
        URNRequiresSplittableDoFn     = 
"beam:requirement:pardo:splittable_dofn:v1"
        URNRequiresBundleFinalization = "beam:requirement:pardo:finalization:v1"
+       URNRequiresStatefulProcessing = "beam:requirement:pardo:stateful:v1"
        URNTruncate                   = 
"beam:transform:sdf_truncate_sized_restrictions:v1"
 
        // Deprecated: Determine worker binary based on GoWorkerBinary Role 
instead.
@@ -457,6 +458,29 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) 
([]string, error) {
                if _, ok := 
edge.Edge.DoFn.ProcessElementFn().BundleFinalization(); ok {
                        m.requirements[URNRequiresBundleFinalization] = true
                }
+               if _, ok := edge.Edge.DoFn.ProcessElementFn().StateProvider(); 
ok {
+                       m.requirements[URNRequiresStatefulProcessing] = true
+                       stateSpecs := make(map[string]*pipepb.StateSpec)
+                       for _, ps := range edge.Edge.DoFn.PipelineState() {
+                               coderId, err := 
m.coders.Add(edge.Edge.StateCoders[ps.StateKey()])
+                               if err != nil {
+                                       return handleErr(err)
+                               }
+                               stateSpecs[ps.StateKey()] = &pipepb.StateSpec{
+                                       // TODO (#22736) - make spec type and 
protocol conditional on type of State. Right now, assumes ValueState.
+                                       // See 
https://github.com/apache/beam/blob/54b0784da7ccba738deff22bd83fbc374ad21d2e/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go#L2635
+                                       Spec: 
&pipepb.StateSpec_ReadModifyWriteSpec{
+                                               ReadModifyWriteSpec: 
&pipepb.ReadModifyWriteStateSpec{
+                                                       CoderId: coderId,
+                                               },
+                                       },
+                                       Protocol: &pipepb.FunctionSpec{
+                                               Urn: "beam:user_state:bag:v1",
+                                       },
+                               }
+                       }
+                       payload.StateSpecs = stateSpecs
+               }
                spec = &pipepb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
                annotations = edge.Edge.DoFn.Annotations()
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go 
b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
index 413aa0f4d8c..e2e1e78c061 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.pb.go
@@ -24,7 +24,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
 //     protoc-gen-go v1.27.1
-//     protoc        v3.19.4
+//     protoc        v3.14.0
 // source: go/pkg/beam/core/runtime/graphx/v1/v1.proto
 
 package v1
@@ -218,6 +218,7 @@ const (
        Type_COGBK              Type_Special = 13
        Type_WINDOWEDVALUE      Type_Special = 14
        Type_BUNDLEFINALIZATION Type_Special = 23
+       Type_STATEPROVIDER      Type_Special = 24
        Type_T                  Type_Special = 15
        Type_U                  Type_Special = 16
        Type_V                  Type_Special = 17
@@ -240,6 +241,7 @@ var (
                13: "COGBK",
                14: "WINDOWEDVALUE",
                23: "BUNDLEFINALIZATION",
+               24: "STATEPROVIDER",
                15: "T",
                16: "U",
                17: "V",
@@ -259,6 +261,7 @@ var (
                "COGBK":              13,
                "WINDOWEDVALUE":      14,
                "BUNDLEFINALIZATION": 23,
+               "STATEPROVIDER":      24,
                "T":                  15,
                "U":                  16,
                "V":                  17,
@@ -1372,7 +1375,7 @@ var 
file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
        0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 
0x61, 0x6d, 0x2e, 0x73,
        0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 
0x65, 0x61, 0x6d, 0x2e,
        0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 
0x2e, 0x67, 0x72, 0x61,
-       0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xcb, 0x0b, 0x0a, 0x04, 0x54, 
0x79, 0x70, 0x65, 0x12,
+       0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x22, 0xde, 0x0b, 0x0a, 0x04, 0x54, 
0x79, 0x70, 0x65, 0x12,
        0x56, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 
0x0e, 0x32, 0x42, 0x2e,
        0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 
0x65, 0x61, 0x6d, 0x2e,
        0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 
0x62, 0x65, 0x61, 0x6d,
@@ -1453,7 +1456,7 @@ var 
file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
        0x52, 0x4e, 0x41, 0x4c, 0x10, 0x1a, 0x22, 0x27, 0x0a, 0x07, 0x43, 0x68, 
0x61, 0x6e, 0x44, 0x69,
        0x72, 0x12, 0x08, 0x0a, 0x04, 0x52, 0x45, 0x43, 0x56, 0x10, 0x00, 0x12, 
0x08, 0x0a, 0x04, 0x53,
        0x45, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x42, 0x4f, 0x54, 
0x48, 0x10, 0x02, 0x22,
-       0xc2, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 
0x0b, 0x0a, 0x07, 0x49,
+       0xd5, 0x01, 0x0a, 0x07, 0x53, 0x70, 0x65, 0x63, 0x69, 0x61, 0x6c, 0x12, 
0x0b, 0x0a, 0x07, 0x49,
        0x4c, 0x4c, 0x45, 0x47, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 
0x45, 0x52, 0x52, 0x4f,
        0x52, 0x10, 0x01, 0x12, 0x0b, 0x0a, 0x07, 0x43, 0x4f, 0x4e, 0x54, 0x45, 
0x58, 0x54, 0x10, 0x02,
        0x12, 0x08, 0x0a, 0x04, 0x54, 0x59, 0x50, 0x45, 0x10, 0x03, 0x12, 0x0d, 
0x0a, 0x09, 0x45, 0x56,
@@ -1462,175 +1465,176 @@ var 
file_go_pkg_beam_core_runtime_graphx_v1_v1_proto_rawDesc = []byte{
        0x05, 0x43, 0x4f, 0x47, 0x42, 0x4b, 0x10, 0x0d, 0x12, 0x11, 0x0a, 0x0d, 
0x57, 0x49, 0x4e, 0x44,
        0x4f, 0x57, 0x45, 0x44, 0x56, 0x41, 0x4c, 0x55, 0x45, 0x10, 0x0e, 0x12, 
0x16, 0x0a, 0x12, 0x42,
        0x55, 0x4e, 0x44, 0x4c, 0x45, 0x46, 0x49, 0x4e, 0x41, 0x4c, 0x49, 0x5a, 
0x41, 0x54, 0x49, 0x4f,
-       0x4e, 0x10, 0x17, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 0x0f, 0x12, 0x05, 
0x0a, 0x01, 0x55, 0x10,
-       0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 0x05, 0x0a, 0x01, 
0x57, 0x10, 0x12, 0x12,
-       0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 0x01, 0x59, 0x10, 
0x14, 0x12, 0x05, 0x0a,
-       0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 0x46, 0x75, 0x6c, 
0x6c, 0x54, 0x79, 0x70,
-       0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 
0x01, 0x28, 0x0b, 0x32,
-       0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 
0x2e, 0x62, 0x65, 0x61,
-       0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 
0x67, 0x2e, 0x62, 0x65,
-       0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 
0x69, 0x6d, 0x65, 0x2e,
-       0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x79, 
0x70, 0x65, 0x52, 0x04,
-       0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 
0x6f, 0x6e, 0x65, 0x6e,
-       0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 
0x72, 0x67, 0x2e, 0x61,
+       0x4e, 0x10, 0x17, 0x12, 0x11, 0x0a, 0x0d, 0x53, 0x54, 0x41, 0x54, 0x45, 
0x50, 0x52, 0x4f, 0x56,
+       0x49, 0x44, 0x45, 0x52, 0x10, 0x18, 0x12, 0x05, 0x0a, 0x01, 0x54, 0x10, 
0x0f, 0x12, 0x05, 0x0a,
+       0x01, 0x55, 0x10, 0x10, 0x12, 0x05, 0x0a, 0x01, 0x56, 0x10, 0x11, 0x12, 
0x05, 0x0a, 0x01, 0x57,
+       0x10, 0x12, 0x12, 0x05, 0x0a, 0x01, 0x58, 0x10, 0x13, 0x12, 0x05, 0x0a, 
0x01, 0x59, 0x10, 0x14,
+       0x12, 0x05, 0x0a, 0x01, 0x5a, 0x10, 0x15, 0x22, 0xc0, 0x01, 0x0a, 0x08, 
0x46, 0x75, 0x6c, 0x6c,
+       0x54, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 
0x18, 0x01, 0x20, 0x01,
+       0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
+       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 
0x2e, 0x70, 0x6b, 0x67,
+       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 
0x75, 0x6e, 0x74, 0x69,
+       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x54, 0x79, 0x70,
+       0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x61, 0x0a, 0x0a, 0x63, 
0x6f, 0x6d, 0x70, 0x6f,
+       0x6e, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 
0x41, 0x2e, 0x6f, 0x72,
+       0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 
0x6d, 0x2e, 0x73, 0x64,
+       0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 
0x61, 0x6d, 0x2e, 0x63,
+       0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 
0x67, 0x72, 0x61, 0x70,
+       0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 
0x70, 0x65, 0x52, 0x0a,
+       0x63, 0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 
0x0a, 0x06, 0x55, 0x73,
+       0x65, 0x72, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 
0x18, 0x01, 0x20, 0x01,
+       0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 
0x74, 0x79, 0x70, 0x65,
+       0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 
0x2e, 0x61, 0x70, 0x61,
+       0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 
0x73, 0x2e, 0x67, 0x6f,
+       0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 
0x72, 0x65, 0x2e, 0x72,
+       0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 
0x78, 0x2e, 0x76, 0x31,
+       0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 
0x94, 0x01, 0x0a, 0x05,
+       0x44, 0x79, 0x6e, 0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 
0x65, 0x18, 0x01, 0x20,
+       0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 
0x04, 0x74, 0x79, 0x70,
+       0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 
0x67, 0x2e, 0x61, 0x70,
+       0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 
0x6b, 0x73, 0x2e, 0x67,
+       0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 
0x6f, 0x72, 0x65, 0x2e,
+       0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 
0x68, 0x78, 0x2e, 0x76,
+       0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 
0x12, 0x12, 0x0a, 0x04,
+       0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 
0x64, 0x61, 0x74, 0x61,
+       0x12, 0x10, 0x0a, 0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 
0x09, 0x52, 0x03, 0x67,
+       0x65, 0x6e, 0x22, 0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 
0x02, 0x66, 0x6e, 0x18,
+       0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
+       0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 
0x2e, 0x67, 0x6f, 0x2e,
+       0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 
0x65, 0x2e, 0x72, 0x75,
+       0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 
0x2e, 0x76, 0x31, 0x2e,
+       0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 
0x0a, 0x04, 0x74, 0x79,
+       0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 
0x72, 0x67, 0x2e, 0x61,
        0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 
0x64, 0x6b, 0x73, 0x2e,
        0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 
0x63, 0x6f, 0x72, 0x65,
        0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 
0x70, 0x68, 0x78, 0x2e,
-       0x76, 0x31, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 
0x0a, 0x63, 0x6f, 0x6d,
-       0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x6f, 0x0a, 0x06, 0x55, 
0x73, 0x65, 0x72, 0x46,
-       0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 
0x01, 0x28, 0x09, 0x52,
-       0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 
0x65, 0x18, 0x02, 0x20,
-       0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
-       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 
0x6f, 0x2e, 0x70, 0x6b,
-       0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 
0x72, 0x75, 0x6e, 0x74,
-       0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 
0x31, 0x2e, 0x54, 0x79,
-       0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x94, 0x01, 0x0a, 
0x05, 0x44, 0x79, 0x6e,
-       0x46, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 
0x20, 0x01, 0x28, 0x09,
+       0x76, 0x31, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 
0x65, 0x12, 0x10, 0x0a,
+       0x03, 0x6f, 0x70, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 
0x6f, 0x70, 0x74, 0x12,
+       0x54, 0x0a, 0x05, 0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 
0x28, 0x0b, 0x32, 0x3e,
+       0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 
0x62, 0x65, 0x61, 0x6d,
+       0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 
0x2e, 0x62, 0x65, 0x61,
+       0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 
0x6d, 0x65, 0x2e, 0x67,
+       0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 
0x46, 0x6e, 0x52, 0x05,
+       0x64, 0x79, 0x6e, 0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 
0x64, 0x6f, 0x77, 0x46,
+       0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 
0x01, 0x28, 0x09, 0x52,
+       0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 
0x65, 0x5f, 0x6d, 0x73,
+       0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 
0x4d, 0x73, 0x12, 0x1b,
+       0x0a, 0x09, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 
0x03, 0x20, 0x01, 0x28,
+       0x03, 0x52, 0x08, 0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 
0x15, 0x0a, 0x06, 0x67,
+       0x61, 0x70, 0x5f, 0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 
0x05, 0x67, 0x61, 0x70,
+       0x4d, 0x73, 0x22, 0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 
0x6d, 0x43, 0x6f, 0x64,
+       0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 
0x20, 0x01, 0x28, 0x09,
        0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 
0x70, 0x65, 0x18, 0x02,
        0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 
0x70, 0x61, 0x63, 0x68,
        0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 
0x67, 0x6f, 0x2e, 0x70,
        0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 
0x2e, 0x72, 0x75, 0x6e,
        0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 
0x76, 0x31, 0x2e, 0x54,
-       0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 
0x04, 0x64, 0x61, 0x74,
-       0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 
0x61, 0x12, 0x10, 0x0a,
-       0x03, 0x67, 0x65, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 
0x67, 0x65, 0x6e, 0x22,
-       0x90, 0x02, 0x0a, 0x02, 0x46, 0x6e, 0x12, 0x4f, 0x0a, 0x02, 0x66, 0x6e, 
0x18, 0x01, 0x20, 0x01,
-       0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
-       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 
0x2e, 0x70, 0x6b, 0x67,
-       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 
0x75, 0x6e, 0x74, 0x69,
-       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x55, 0x73, 0x65,
-       0x72, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x51, 0x0a, 0x04, 0x74, 
0x79, 0x70, 0x65, 0x18,
-       0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
+       0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 
0x03, 0x65, 0x6e, 0x63,
+       0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 
0x2e, 0x61, 0x70, 0x61,
+       0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 
0x73, 0x2e, 0x67, 0x6f,
+       0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 
0x72, 0x65, 0x2e, 0x72,
+       0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 
0x78, 0x2e, 0x76, 0x31,
+       0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 
0x12, 0x51, 0x0a, 0x03,
+       0x64, 0x65, 0x63, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 
0x6f, 0x72, 0x67, 0x2e,
+       0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 
0x73, 0x64, 0x6b, 0x73,
+       0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 
0x2e, 0x63, 0x6f, 0x72,
+       0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 
0x61, 0x70, 0x68, 0x78,
+       0x2e, 0x76, 0x31, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 
0x64, 0x65, 0x63, 0x22,
+       0xba, 0x06, 0x0a, 0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 
0x65, 0x12, 0x4b, 0x0a,
+       0x02, 0x66, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 
0x6f, 0x72, 0x67, 0x2e,
+       0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 
0x73, 0x64, 0x6b, 0x73,
+       0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 
0x2e, 0x63, 0x6f, 0x72,
+       0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 
0x61, 0x70, 0x68, 0x78,
+       0x2e, 0x76, 0x31, 0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 
0x0a, 0x06, 0x6f, 0x70,
+       0x63, 0x6f, 0x64, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 
0x6f, 0x70, 0x63, 0x6f,
+       0x64, 0x65, 0x12, 0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 
0x5f, 0x66, 0x6e, 0x18,
+       0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
        0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 
0x2e, 0x67, 0x6f, 0x2e,
        0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 
0x65, 0x2e, 0x72, 0x75,
        0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 
0x2e, 0x76, 0x31, 0x2e,
-       0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x10, 
0x0a, 0x03, 0x6f, 0x70,
-       0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6f, 0x70, 0x74, 
0x12, 0x54, 0x0a, 0x05,
-       0x64, 0x79, 0x6e, 0x66, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 
0x3e, 0x2e, 0x6f, 0x72,
+       0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 
0x6e, 0x64, 0x6f, 0x77,
+       0x46, 0x6e, 0x12, 0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 
0x64, 0x18, 0x02, 0x20,
+       0x03, 0x28, 0x0b, 0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
+       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 
0x6f, 0x2e, 0x70, 0x6b,
+       0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 
0x72, 0x75, 0x6e, 0x74,
+       0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 
0x31, 0x2e, 0x4d, 0x75,
+       0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 
0x75, 0x6e, 0x64, 0x52,
+       0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 
0x6f, 0x75, 0x74, 0x62,
+       0x6f, 0x75, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 
0x2e, 0x6f, 0x72, 0x67,
+       0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 
0x2e, 0x73, 0x64, 0x6b,
+       0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 
0x6d, 0x2e, 0x63, 0x6f,
+       0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 
0x72, 0x61, 0x70, 0x68,
+       0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 
0x67, 0x65, 0x2e, 0x4f,
+       0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 
0x62, 0x6f, 0x75, 0x6e,
+       0x64, 0x1a, 0xb5, 0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 
0x64, 0x12, 0x68, 0x0a,
+       0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 
0x54, 0x2e, 0x6f, 0x72,
        0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 
0x6d, 0x2e, 0x73, 0x64,
        0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 
0x61, 0x6d, 0x2e, 0x63,
        0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 
0x67, 0x72, 0x61, 0x70,
-       0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x79, 0x6e, 0x46, 0x6e, 0x52, 
0x05, 0x64, 0x79, 0x6e,
-       0x66, 0x6e, 0x22, 0x6b, 0x0a, 0x08, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 
0x46, 0x6e, 0x12, 0x12,
-       0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 
0x52, 0x04, 0x6b, 0x69,
-       0x6e, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x69, 0x7a, 0x65, 0x5f, 0x6d, 
0x73, 0x18, 0x02, 0x20,
-       0x01, 0x28, 0x03, 0x52, 0x06, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x73, 0x12, 
0x1b, 0x0a, 0x09, 0x70,
-       0x65, 0x72, 0x69, 0x6f, 0x64, 0x5f, 0x6d, 0x73, 0x18, 0x03, 0x20, 0x01, 
0x28, 0x03, 0x52, 0x08,
-       0x70, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x4d, 0x73, 0x12, 0x15, 0x0a, 0x06, 
0x67, 0x61, 0x70, 0x5f,
-       0x6d, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x67, 0x61, 
0x70, 0x4d, 0x73, 0x22,
-       0x9a, 0x02, 0x0a, 0x0b, 0x43, 0x75, 0x73, 0x74, 0x6f, 0x6d, 0x43, 0x6f, 
0x64, 0x65, 0x72, 0x12,
-       0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 
0x09, 0x52, 0x04, 0x6e,
-       0x61, 0x6d, 0x65, 0x12, 0x51, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 
0x02, 0x20, 0x01, 0x28,
-       0x0b, 0x32, 0x3d, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 
0x68, 0x65, 0x2e, 0x62,
-       0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 
0x70, 0x6b, 0x67, 0x2e,
-       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 
0x6e, 0x74, 0x69, 0x6d,
-       0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 
0x54, 0x79, 0x70, 0x65,
-       0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x51, 0x0a, 0x03, 0x65, 0x6e, 
0x63, 0x18, 0x03, 0x20,
-       0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
+       0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 
0x64, 0x67, 0x65, 0x2e,
+       0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 
0x74, 0x4b, 0x69, 0x6e,
+       0x64, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 
0x79, 0x70, 0x65, 0x18,
+       0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
+       0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 
0x2e, 0x67, 0x6f, 0x2e,
+       0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 
0x65, 0x2e, 0x72, 0x75,
+       0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 
0x2e, 0x76, 0x31, 0x2e,
+       0x46, 0x75, 0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 
0x70, 0x65, 0x22, 0x69,
+       0x0a, 0x09, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 
0x0b, 0x0a, 0x07, 0x49,
+       0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 
0x4d, 0x41, 0x49, 0x4e,
+       0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 
0x54, 0x4f, 0x4e, 0x10,
+       0x02, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 
0x12, 0x07, 0x0a, 0x03,
+       0x4d, 0x41, 0x50, 0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 
0x54, 0x49, 0x4d, 0x41,
+       0x50, 0x10, 0x05, 0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 
0x06, 0x12, 0x0a, 0x0a,
+       0x06, 0x52, 0x45, 0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 
0x08, 0x4f, 0x75, 0x74,
+       0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 
0x65, 0x18, 0x01, 0x20,
+       0x01, 0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
        0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 
0x6f, 0x2e, 0x70, 0x6b,
        0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 
0x72, 0x75, 0x6e, 0x74,
-       0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 
0x31, 0x2e, 0x55, 0x73,
-       0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x65, 0x6e, 0x63, 0x12, 0x51, 0x0a, 
0x03, 0x64, 0x65, 0x63,
-       0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3f, 0x2e, 0x6f, 0x72, 0x67, 
0x2e, 0x61, 0x70, 0x61,
-       0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 
0x73, 0x2e, 0x67, 0x6f,
-       0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 
0x72, 0x65, 0x2e, 0x72,
-       0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 
0x78, 0x2e, 0x76, 0x31,
-       0x2e, 0x55, 0x73, 0x65, 0x72, 0x46, 0x6e, 0x52, 0x03, 0x64, 0x65, 0x63, 
0x22, 0xba, 0x06, 0x0a,
-       0x09, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x12, 0x4b, 
0x0a, 0x02, 0x66, 0x6e,
-       0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x3b, 0x2e, 0x6f, 0x72, 0x67, 
0x2e, 0x61, 0x70, 0x61,
-       0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 
0x73, 0x2e, 0x67, 0x6f,
-       0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 
0x72, 0x65, 0x2e, 0x72,
-       0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 
0x78, 0x2e, 0x76, 0x31,
-       0x2e, 0x46, 0x6e, 0x52, 0x02, 0x66, 0x6e, 0x12, 0x16, 0x0a, 0x06, 0x6f, 
0x70, 0x63, 0x6f, 0x64,
-       0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x6f, 0x70, 0x63, 
0x6f, 0x64, 0x65, 0x12,
-       0x5e, 0x0a, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x66, 0x6e, 
0x18, 0x05, 0x20, 0x01,
-       0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
-       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 
0x2e, 0x70, 0x6b, 0x67,
-       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 
0x75, 0x6e, 0x74, 0x69,
-       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x57, 0x69, 0x6e,
-       0x64, 0x6f, 0x77, 0x46, 0x6e, 0x52, 0x08, 0x77, 0x69, 0x6e, 0x64, 0x6f, 
0x77, 0x46, 0x6e, 0x12,
-       0x64, 0x0a, 0x07, 0x69, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x02, 
0x20, 0x03, 0x28, 0x0b,
-       0x32, 0x4a, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 
0x65, 0x2e, 0x62, 0x65,
-       0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 
0x6b, 0x67, 0x2e, 0x62,
-       0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 
0x74, 0x69, 0x6d, 0x65,
-       0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 
0x75, 0x6c, 0x74, 0x69,
-       0x45, 0x64, 0x67, 0x65, 0x2e, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 
0x52, 0x07, 0x69, 0x6e,
-       0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x67, 0x0a, 0x08, 0x6f, 0x75, 0x74, 
0x62, 0x6f, 0x75, 0x6e,
-       0x64, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x4b, 0x2e, 0x6f, 0x72, 
0x67, 0x2e, 0x61, 0x70,
-       0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 
0x6b, 0x73, 0x2e, 0x67,
-       0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 
0x6f, 0x72, 0x65, 0x2e,
-       0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 
0x68, 0x78, 0x2e, 0x76,
-       0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x2e, 
0x4f, 0x75, 0x74, 0x62,
-       0x6f, 0x75, 0x6e, 0x64, 0x52, 0x08, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 
0x6e, 0x64, 0x1a, 0xb5,
-       0x02, 0x0a, 0x07, 0x49, 0x6e, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x68, 
0x0a, 0x04, 0x6b, 0x69,
-       0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x54, 0x2e, 0x6f, 
0x72, 0x67, 0x2e, 0x61,
+       0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 
0x31, 0x2e, 0x46, 0x75,
+       0x6c, 0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 
0x22, 0x1d, 0x0a, 0x0d,
+       0x49, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 
0x64, 0x12, 0x0c, 0x0a,
+       0x01, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 
0xf5, 0x01, 0x0a, 0x10,
+       0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 
0x6c, 0x6f, 0x61, 0x64,
+       0x12, 0x19, 0x0a, 0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 
0x18, 0x01, 0x20, 0x01,
+       0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 
0x83, 0x01, 0x0a, 0x0e,
+       0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 
0x64, 0x73, 0x18, 0x02,
+       0x20, 0x03, 0x28, 0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 
0x70, 0x61, 0x63, 0x68,
+       0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 
0x67, 0x6f, 0x2e, 0x70,
+       0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 
0x2e, 0x72, 0x75, 0x6e,
+       0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 
0x76, 0x31, 0x2e, 0x52,
+       0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 
0x6f, 0x61, 0x64, 0x2e,
+       0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 
0x73, 0x45, 0x6e, 0x74,
+       0x72, 0x79, 0x52, 0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 
0x6c, 0x6f, 0x61, 0x64,
+       0x73, 0x1a, 0x40, 0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 
0x79, 0x6c, 0x6f, 0x61,
+       0x64, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 
0x65, 0x79, 0x18, 0x01,
+       0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 
0x05, 0x76, 0x61, 0x6c,
+       0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 
0x6c, 0x75, 0x65, 0x3a,
+       0x02, 0x38, 0x01, 0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 
0x73, 0x66, 0x6f, 0x72,
+       0x6d, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 
0x75, 0x72, 0x6e, 0x18,
+       0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 
0x0a, 0x04, 0x65, 0x64,
+       0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 
0x72, 0x67, 0x2e, 0x61,
        0x70, 0x61, 0x63, 0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 
0x64, 0x6b, 0x73, 0x2e,
        0x67, 0x6f, 0x2e, 0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 
0x63, 0x6f, 0x72, 0x65,
        0x2e, 0x72, 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 
0x70, 0x68, 0x78, 0x2e,
-       0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 
0x2e, 0x49, 0x6e, 0x62,
-       0x6f, 0x75, 0x6e, 0x64, 0x2e, 0x49, 0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 
0x6e, 0x64, 0x52, 0x04,
-       0x6b, 0x69, 0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 
0x18, 0x02, 0x20, 0x01,
-       0x28, 0x0b, 0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
+       0x76, 0x31, 0x2e, 0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 
0x52, 0x04, 0x65, 0x64,
+       0x67, 0x65, 0x12, 0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 
0x18, 0x03, 0x20, 0x01,
+       0x28, 0x0b, 0x32, 0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
        0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 
0x2e, 0x70, 0x6b, 0x67,
        0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 
0x75, 0x6e, 0x74, 0x69,
-       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x46, 0x75, 0x6c,
-       0x6c, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 
0x69, 0x0a, 0x09, 0x49,
-       0x6e, 0x70, 0x75, 0x74, 0x4b, 0x69, 0x6e, 0x64, 0x12, 0x0b, 0x0a, 0x07, 
0x49, 0x4e, 0x56, 0x41,
-       0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x41, 0x49, 
0x4e, 0x10, 0x01, 0x12,
-       0x0d, 0x0a, 0x09, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x54, 0x4f, 0x4e, 
0x10, 0x02, 0x12, 0x09,
-       0x0a, 0x05, 0x53, 0x4c, 0x49, 0x43, 0x45, 0x10, 0x03, 0x12, 0x07, 0x0a, 
0x03, 0x4d, 0x41, 0x50,
-       0x10, 0x04, 0x12, 0x0c, 0x0a, 0x08, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x4d, 
0x41, 0x50, 0x10, 0x05,
-       0x12, 0x08, 0x0a, 0x04, 0x49, 0x54, 0x45, 0x52, 0x10, 0x06, 0x12, 0x0a, 
0x0a, 0x06, 0x52, 0x45,
-       0x49, 0x54, 0x45, 0x52, 0x10, 0x07, 0x1a, 0x61, 0x0a, 0x08, 0x4f, 0x75, 
0x74, 0x62, 0x6f, 0x75,
-       0x6e, 0x64, 0x12, 0x55, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 
0x20, 0x01, 0x28, 0x0b,
-       0x32, 0x41, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 
0x65, 0x2e, 0x62, 0x65,
-       0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 
0x6b, 0x67, 0x2e, 0x62,
-       0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 
0x74, 0x69, 0x6d, 0x65,
-       0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x46, 
0x75, 0x6c, 0x6c, 0x54,
-       0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x1d, 0x0a, 
0x0d, 0x49, 0x6e, 0x6a,
-       0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0c, 
0x0a, 0x01, 0x6e, 0x18,
-       0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x01, 0x6e, 0x22, 0xf5, 0x01, 0x0a, 
0x10, 0x52, 0x65, 0x73,
-       0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 
0x64, 0x12, 0x19, 0x0a,
-       0x08, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 
0x01, 0x28, 0x09, 0x52,
-       0x07, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x49, 0x64, 0x12, 0x83, 0x01, 0x0a, 
0x0e, 0x63, 0x6f, 0x64,
-       0x65, 0x72, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x18, 
0x02, 0x20, 0x03, 0x28,
-       0x0b, 0x32, 0x5c, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 
0x68, 0x65, 0x2e, 0x62,
-       0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 
0x70, 0x6b, 0x67, 0x2e,
-       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 
0x6e, 0x74, 0x69, 0x6d,
-       0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 
0x52, 0x65, 0x73, 0x68,
-       0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 
0x2e, 0x43, 0x6f, 0x64,
-       0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x73, 0x45, 0x6e, 
0x74, 0x72, 0x79, 0x52,
-       0x0d, 0x63, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 
0x64, 0x73, 0x1a, 0x40,
-       0x0a, 0x12, 0x43, 0x6f, 0x64, 0x65, 0x72, 0x50, 0x61, 0x79, 0x6c, 0x6f, 
0x61, 0x64, 0x73, 0x45,
-       0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 
0x01, 0x20, 0x01, 0x28,
-       0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 
0x6c, 0x75, 0x65, 0x18,
-       0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 
0x3a, 0x02, 0x38, 0x01,
-       0x22, 0xc5, 0x02, 0x0a, 0x10, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 
0x72, 0x6d, 0x50, 0x61,
-       0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6e, 
0x18, 0x01, 0x20, 0x01,
-       0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6e, 0x12, 0x56, 0x0a, 0x04, 0x65, 
0x64, 0x67, 0x65, 0x18,
-       0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x42, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
+       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x49, 0x6e, 0x6a,
+       0x65, 0x63, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 
0x69, 0x6e, 0x6a, 0x65,
+       0x63, 0x74, 0x12, 0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 
0x66, 0x6c, 0x65, 0x18,
+       0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 
0x61, 0x70, 0x61, 0x63,
        0x68, 0x65, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 
0x2e, 0x67, 0x6f, 0x2e,
        0x70, 0x6b, 0x67, 0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 
0x65, 0x2e, 0x72, 0x75,
        0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 
0x2e, 0x76, 0x31, 0x2e,
-       0x4d, 0x75, 0x6c, 0x74, 0x69, 0x45, 0x64, 0x67, 0x65, 0x52, 0x04, 0x65, 
0x64, 0x67, 0x65, 0x12,
-       0x5e, 0x0a, 0x06, 0x69, 0x6e, 0x6a, 0x65, 0x63, 0x74, 0x18, 0x03, 0x20, 
0x01, 0x28, 0x0b, 0x32,
-       0x46, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 
0x2e, 0x62, 0x65, 0x61,
-       0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 0x2e, 0x70, 0x6b, 
0x67, 0x2e, 0x62, 0x65,
-       0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 0x75, 0x6e, 0x74, 
0x69, 0x6d, 0x65, 0x2e,
-       0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6e, 
0x6a, 0x65, 0x63, 0x74,
-       0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x06, 0x69, 0x6e, 0x6a, 
0x65, 0x63, 0x74, 0x12,
-       0x67, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 
0x18, 0x04, 0x20, 0x01,
-       0x28, 0x0b, 0x32, 0x49, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x61, 0x70, 0x61, 
0x63, 0x68, 0x65, 0x2e,
-       0x62, 0x65, 0x61, 0x6d, 0x2e, 0x73, 0x64, 0x6b, 0x73, 0x2e, 0x67, 0x6f, 
0x2e, 0x70, 0x6b, 0x67,
-       0x2e, 0x62, 0x65, 0x61, 0x6d, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x72, 
0x75, 0x6e, 0x74, 0x69,
-       0x6d, 0x65, 0x2e, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2e, 0x76, 0x31, 
0x2e, 0x52, 0x65, 0x73,
-       0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 
0x64, 0x52, 0x09, 0x72,
-       0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 0x46, 0x5a, 0x44, 
0x67, 0x69, 0x74, 0x68,
-       0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 0x61, 0x63, 0x68, 
0x65, 0x2f, 0x62, 0x65,
-       0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x67, 
0x6f, 0x2f, 0x70, 0x6b,
-       0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 
0x72, 0x75, 0x6e, 0x74,
-       0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 0x78, 0x2f, 0x76, 
0x31, 0x3b, 0x76, 0x31,
-       0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+       0x52, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x50, 0x61, 0x79, 
0x6c, 0x6f, 0x61, 0x64,
+       0x52, 0x09, 0x72, 0x65, 0x73, 0x68, 0x75, 0x66, 0x66, 0x6c, 0x65, 0x42, 
0x46, 0x5a, 0x44, 0x67,
+       0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x70, 
0x61, 0x63, 0x68, 0x65,
+       0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x64, 0x6b, 0x73, 0x2f, 0x76, 
0x32, 0x2f, 0x67, 0x6f,
+       0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, 0x61, 0x6d, 0x2f, 0x63, 0x6f, 
0x72, 0x65, 0x2f, 0x72,
+       0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x67, 0x72, 0x61, 0x70, 0x68, 
0x78, 0x2f, 0x76, 0x31,
+       0x3b, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto 
b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
index ef84c20887a..5046994707b 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
+++ b/sdks/go/pkg/beam/core/runtime/graphx/v1/v1.proto
@@ -114,6 +114,8 @@ message Type {
 
         BUNDLEFINALIZATION = 23;
 
+        STATEPROVIDER = 24;
+
         T = 15;
         U = 16;
         V = 17;
diff --git a/sdks/go/pkg/beam/core/state/state.go 
b/sdks/go/pkg/beam/core/state/state.go
new file mode 100644
index 00000000000..62fd4509936
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package state contains structs for reading and manipulating pipeline state.
+package state
+
+import (
+       "reflect"
+)
+
+type TransactionType_Enum int32
+
+const (
+       TransactionType_Set   TransactionType_Enum = 0
+       TransactionType_Clear TransactionType_Enum = 1
+)
+
+var (
+       ProviderType = reflect.TypeOf((*Provider)(nil)).Elem()
+)
+
+// TODO(#20510) - add other forms of state (MapState, BagState, 
CombiningState), prefetch, and clear.
+
+// Transaction is used to represent a pending state transaction. This should 
not be manipulated directly;
+// it is primarily used for implementations of the Provider interface to talk 
to the various State objects.
+type Transaction struct {
+       Key  string
+       Type TransactionType_Enum
+       Val  interface{}
+}
+
+// Provider represents the DoFn parameter used to get and manipulate pipeline 
state
+// stored as key value pairs 
(https://beam.apache.org/documentation/programming-guide/#state-and-timers).
+// This should not be manipulated directly. Instead it should be used as a 
parameter
+// to functions on State objects like state.Value.
+type Provider interface {
+       ReadValueState(id string) (interface{}, []Transaction, error)
+       WriteValueState(val Transaction) error
+}
+
+// PipelineState is an interface representing different kinds of PipelineState 
(currently just state.Value).
+// It is primarily meant for Beam packages to use and is probably not useful 
for most pipeline authors.
+type PipelineState interface {
+       StateKey() string
+       CoderType() reflect.Type
+}
+
+// Value is used to read and write global pipeline state representing a single 
value.
+// Key represents the key used to lookup this state.
+type Value[T any] struct {
+       Key string
+}
+
+// Write is used to write this instance of global pipeline state representing 
a single value.
+func (s *Value[T]) Write(p Provider, val T) error {
+       return p.WriteValueState(Transaction{
+               Key:  s.Key,
+               Type: TransactionType_Set,
+               Val:  val,
+       })
+}
+
+// Read is used to read this instance of global pipeline state representing a 
single value.
+// When a value is not found, returns the 0 value and false.
+func (s *Value[T]) Read(p Provider) (T, bool, error) {
+       // This replays any writes that have happened to this value since we 
last read
+       // For more detail, see "State Transactionality" below for buffered 
transactions
+       cur, bufferedTransactions, err := p.ReadValueState(s.Key)
+       if err != nil {
+               var val T
+               return val, false, err
+       }
+       for _, t := range bufferedTransactions {
+               switch t.Type {
+               case TransactionType_Set:
+                       cur = t.Val
+               case TransactionType_Clear:
+                       cur = nil
+               }
+       }
+       if cur == nil {
+               var val T
+               return val, false, nil
+       }
+       return cur.(T), true, nil
+}
+
+// StateKey returns the key for this pipeline state entry.
+func (s Value[T]) StateKey() string {
+       if s.Key == "" {
+               // TODO(#22736) - infer the state from the member variable name 
during pipeline construction.
+               panic("Value state exists on struct but has not been 
initialized with a key.")
+       }
+       return s.Key
+}
+
+// CoderType returns the type of the value state which should be used for a 
coder.
+func (s Value[T]) CoderType() reflect.Type {
+       var t T
+       return reflect.TypeOf(t)
+}
+
+// MakeValueState is a factory function to create an instance of ValueState 
with the given key.
+func MakeValueState[T any](k string) Value[T] {
+       return Value[T]{
+               Key: k,
+       }
+}
diff --git a/sdks/go/pkg/beam/core/state/state_test.go 
b/sdks/go/pkg/beam/core/state/state_test.go
new file mode 100644
index 00000000000..b1297891c1c
--- /dev/null
+++ b/sdks/go/pkg/beam/core/state/state_test.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package state
+
+import (
+       "errors"
+       "testing"
+)
+
+var (
+       errFake = errors.New("fake error")
+)
+
+type fakeProvider struct {
+       initialState map[string]interface{}
+       transactions map[string][]Transaction
+       err          map[string]error
+}
+
+func (s *fakeProvider) ReadValueState(userStateId string) (interface{}, 
[]Transaction, error) {
+       if err, ok := s.err[userStateId]; ok {
+               return nil, nil, err
+       }
+       base := s.initialState[userStateId]
+       trans, ok := s.transactions[userStateId]
+       if !ok {
+               trans = []Transaction{}
+       }
+       return base, trans, nil
+}
+
+func (s *fakeProvider) WriteValueState(val Transaction) error {
+       if transactions, ok := s.transactions[val.Key]; ok {
+               s.transactions[val.Key] = append(transactions, val)
+       } else {
+               s.transactions[val.Key] = []Transaction{val}
+       }
+       return nil
+}
+
+func TestValueRead(t *testing.T) {
+       is := make(map[string]interface{})
+       ts := make(map[string][]Transaction)
+       es := make(map[string]error)
+       is["no_transactions"] = 1
+       ts["no_transactions"] = nil
+       is["basic_set"] = 1
+       ts["basic_set"] = []Transaction{{Key: "basic_set", Type: 
TransactionType_Set, Val: 3}}
+       is["basic_clear"] = 1
+       ts["basic_clear"] = []Transaction{{Key: "basic_clear", Type: 
TransactionType_Clear, Val: nil}}
+       is["set_then_clear"] = 1
+       ts["set_then_clear"] = []Transaction{{Key: "set_then_clear", Type: 
TransactionType_Set, Val: 3}, {Key: "set_then_clear", Type: 
TransactionType_Clear, Val: nil}}
+       is["set_then_clear_then_set"] = 1
+       ts["set_then_clear_then_set"] = []Transaction{{Key: 
"set_then_clear_then_set", Type: TransactionType_Set, Val: 3}, {Key: 
"set_then_clear_then_set", Type: TransactionType_Clear, Val: nil}, {Key: 
"set_then_clear_then_set", Type: TransactionType_Set, Val: 4}}
+       is["err"] = 1
+       ts["err"] = []Transaction{{Key: "err", Type: TransactionType_Set, Val: 
3}}
+       es["err"] = errFake
+
+       f := fakeProvider{
+               initialState: is,
+               transactions: ts,
+               err:          es,
+       }
+
+       var tests = []struct {
+               vs  Value[int]
+               val int
+               ok  bool
+               err error
+       }{
+               {MakeValueState[int]("no_transactions"), 1, true, nil},
+               {MakeValueState[int]("basic_set"), 3, true, nil},
+               {MakeValueState[int]("basic_clear"), 0, false, nil},
+               {MakeValueState[int]("set_then_clear"), 0, false, nil},
+               {MakeValueState[int]("set_then_clear_then_set"), 4, true, nil},
+               {MakeValueState[int]("err"), 0, false, errFake},
+       }
+
+       for _, tt := range tests {
+               val, ok, err := tt.vs.Read(&f)
+               if err != nil && tt.err == nil {
+                       t.Errorf("Value.Read() returned error %v for state key 
%v when it shouldn't have", err, tt.vs.Key)
+               } else if err == nil && tt.err != nil {
+                       t.Errorf("Value.Read() returned no error for state key 
%v when it should have returned %v", tt.vs.Key, err)
+               } else if ok && !tt.ok {
+                       t.Errorf("Value.Read() returned a value %v for state 
key %v when it shouldn't have", val, tt.vs.Key)
+               } else if !ok && tt.ok {
+                       t.Errorf("Value.Read() didn't return a value for state 
key %v when it should have returned %v", tt.vs.Key, tt.val)
+               } else if val != tt.val {
+                       t.Errorf("Value.Read()=%v, want %v for state key %v", 
val, tt.val, tt.vs.Key)
+               }
+       }
+}
+
+func TestValueWrite(t *testing.T) {
+       var tests = []struct {
+               writes []int
+               val    int
+               ok     bool
+       }{
+               {[]int{}, 0, false},
+               {[]int{3}, 3, true},
+               {[]int{1, 5}, 5, true},
+       }
+
+       for _, tt := range tests {
+               f := fakeProvider{
+                       initialState: make(map[string]interface{}),
+                       transactions: make(map[string][]Transaction),
+                       err:          make(map[string]error),
+               }
+               vs := MakeValueState[int]("vs")
+               for _, val := range tt.writes {
+                       vs.Write(&f, val)
+               }
+               val, ok, err := vs.Read(&f)
+               if err != nil {
+                       t.Errorf("Value.Read() returned error %v when it 
shouldn't have after writing: %v", err, tt.writes)
+               } else if ok && !tt.ok {
+                       t.Errorf("Value.Read() returned a value %v when it 
shouldn't have after writing: %v", val, tt.writes)
+               } else if !ok && tt.ok {
+                       t.Errorf("Value.Read() didn't return a value when it 
should have returned %v after writing: %v", tt.val, tt.writes)
+               } else if val != tt.val {
+                       t.Errorf("Value.Read()=%v, want %v after writing: %v", 
val, tt.val, tt.writes)
+               }
+       }
+}
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index 66fca00838d..dcdb3e74d1e 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -92,6 +92,19 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, 
opts ...Option) ([]PCo
                return nil, addParDoCtx(err, s)
        }
 
+       pipelineState := fn.PipelineState()
+       if len(pipelineState) > 0 {
+               edge.StateCoders = make(map[string]*coder.Coder)
+               for _, ps := range pipelineState {
+                       sT := typex.New(ps.CoderType())
+                       c, err := inferCoder(sT)
+                       if err != nil {
+                               return nil, addParDoCtx(err, s)
+                       }
+                       edge.StateCoders[ps.StateKey()] = c
+               }
+       }
+
        var ret []PCollection
        for _, out := range edge.Output {
                c := PCollection{out.To}

Reply via email to