damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858778358


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)

Review Comment:
   Rather than having this nil check, could we define `n.truncateInv` to 
default to `DefaultTruncateRestriction` in `Up`? That way we're spending less 
time on it on the hotter ProcessElement path and it should simplify some of our 
logic here.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -308,6 +314,17 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
        return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// HasTruncateRestriction returns whether the DoFn implements a custom 
truncate restriction function.
+func (f *SplittableDoFn) HasTruncateRestriction() bool {
+       _, ok := f.methods[truncateRestrictionName]
+       return ok
+}
+
+// TruncateRestrictionFn returns the "TruncateRestriction" function, if 
present.
+func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn {
+       return f.methods[truncateRestrictionName]
+}
+

Review Comment:
   We should also add validation code (and corresponding tests) to make sure 
that the supplied TruncateRestriction function is valid (similar to 
https://github.com/apache/beam/blob/15a064433a363f4c5443b55a43fc29dff836872c/sdks/go/pkg/beam/core/graph/fn.go#L526).
 This would include validating that the input element matches the input element 
type used in ProcessElement and that it has the correct number/type of 
parameters



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)
+       } else {
+               newRest = n.truncateInv.Invoke(rt, mainElm)
+       }
+       size := n.sizeInv.Invoke(mainElm, newRest)
+       output := &FullValue{}
+       output.Timestamp = elm.Timestamp
+       output.Windows = elm.Windows
+       output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+       output.Elm2 = size
+
+       if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+               return err
+       }
+       return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+       n.truncateInv.Reset()

Review Comment:
   We need to reset `n.ctInv` here as well



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)
+       } else {
+               newRest = n.truncateInv.Invoke(rt, mainElm)
+       }
+       size := n.sizeInv.Invoke(mainElm, newRest)
+       output := &FullValue{}
+       output.Timestamp = elm.Timestamp
+       output.Windows = elm.Windows
+       output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+       output.Elm2 = size
+
+       if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+               return err
+       }
+       return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+       n.truncateInv.Reset()

Review Comment:
   If truncateInv is nil, this will throw (another reason the approach I 
suggested above will help :) )



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
        // GetRestriction returns the restriction this tracker is tracking, or 
nil if the restriction
        // is unavailable for some reason.
        GetRestriction() interface{}
+
+       // IsBounded returns the boundedness of the current restriction. If the 
current restriction represents a
+       // finite amount of work, it should return sdf.Bounded. Otherwise, it 
should return sdf.Unbounded.
+       IsBounded() bool

Review Comment:
   This would be a breaking change for anyone who implements the RTracker 
interface (which is also why you needed to update all existing Trackers). One 
way to make this non-breaking would be to introduce a new interface that you 
can compose with this one, something like:
   
   ```
   type UnboundableRTracker interface {
      IsBounded() bool
   }
   ```
   (UnboundableRTracker might be a bad name, feel free to come up with 
something better 😄 ). Then whenever we want to check boundedness, we can call 
something like:
   
   ```
   isRtrackerBound(tracker RTracker) {
      if uTracker, ok := tracker.(UnboundableRTracker); ok && 
uTracker.IsBounded() {
         return true
      }
      return false
   }
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil

Review Comment:
   This default is confusing to me - why do we only handle the offsetrange 
case? Shouldn't we be doing a check to see if its bounded or not and then 
either returning the original restriction or nil?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)

Review Comment:
   Putting the restriction before the element is the opposite of what was 
described in the design doc. I actually think its the right ordering though, 
since its what was done in ProcessElement (and it matches my watermark 
changes), could you update the doc though?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)
+               }
+       case reflectx.Func3x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+               }
+       default:
+               switch len(n.fn.Param) {
+               case 2:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[0] = rest
+                               n.args[1] = elms.Elm
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               case 3:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[1] = elms.Elm
+                               n.args[2] = elms.Elm2
+                               n.args[0] = rest

Review Comment:
   ```suggestion
                                n.args[0] = rest
                                n.args[1] = elms.Elm
                                n.args[2] = elms.Elm2
   ```
   
   Nit - you go in lowest to highest order above, could you do that here as 
well?



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
        // GetRestriction returns the restriction this tracker is tracking, or 
nil if the restriction
        // is unavailable for some reason.
        GetRestriction() interface{}
+
+       // IsBounded returns the boundedness of the current restriction. If the 
current restriction represents a
+       // finite amount of work, it should return sdf.Bounded. Otherwise, it 
should return sdf.Unbounded.
+       IsBounded() bool

Review Comment:
   Note - this implicitly treats all existing restrictions as bounded (which 
IMO is the right thing since we're introducing new behaviors here for unbounded 
restrictions)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)
+               }
+       case reflectx.Func3x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+               }
+       default:
+               switch len(n.fn.Param) {
+               case 2:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[0] = rest
+                               n.args[1] = elms.Elm
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               case 3:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[1] = elms.Elm
+                               n.args[2] = elms.Elm2
+                               n.args[0] = rest
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               default:
+                       return errors.Errorf("TruncateRestriction fn %v has 
unexpected number of parameters: %v",
+                               n.fn.Fn.Name(), len(n.fn.Param))
+               }
+       }
+       return nil
+}
+
+// Invoke calls TruncateRestriction given a FullValue containing an element and
+// the associated restriction tracker, and returns a truncated restriction.
+func (n *trInvoker) Invoke(rt interface{}, elms *FullValue) (rest interface{}) 
{
+       return n.call(elms, rt)

Review Comment:
   Nit: There's not really a reason to have it defined `Invoke(rest, elms)` and 
then have the invoker's call function defined `(elms, rest)` - could you flip 
the ordering of the call function to match all the other orderings here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to