[ 
https://issues.apache.org/jira/browse/BEAM-11106?focusedWorklogId=762353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762353
 ]

ASF GitHub Bot logged work on BEAM-11106:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 14:36
            Start Date: 26/Apr/22 14:36
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858778358


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)

Review Comment:
   Rather than having this nil check, could we define `n.truncateInv` to 
default to `DefaultTruncateRestriction` in `Up`? That way we're spending less 
time on it on the hotter ProcessElement path and it should simplify some of our 
logic here.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -308,6 +314,17 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
        return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// HasTruncateRestriction returns whether the DoFn implements a custom 
truncate restriction function.
+func (f *SplittableDoFn) HasTruncateRestriction() bool {
+       _, ok := f.methods[truncateRestrictionName]
+       return ok
+}
+
+// TruncateRestrictionFn returns the "TruncateRestriction" function, if 
present.
+func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn {
+       return f.methods[truncateRestrictionName]
+}
+

Review Comment:
   We should also add validation code (and corresponding tests) to make sure 
that the supplied TruncateRestriction function is valid (similar to 
https://github.com/apache/beam/blob/15a064433a363f4c5443b55a43fc29dff836872c/sdks/go/pkg/beam/core/graph/fn.go#L526).
 This would include validating that the input element matches the input element 
type used in ProcessElement and that it has the correct number/type of 
parameters



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)
+       } else {
+               newRest = n.truncateInv.Invoke(rt, mainElm)
+       }
+       size := n.sizeInv.Invoke(mainElm, newRest)
+       output := &FullValue{}
+       output.Timestamp = elm.Timestamp
+       output.Windows = elm.Windows
+       output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+       output.Elm2 = size
+
+       if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+               return err
+       }
+       return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+       n.truncateInv.Reset()

Review Comment:
   We need to reset `n.ctInv` here as well



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)
+       } else {
+               newRest = n.truncateInv.Invoke(rt, mainElm)
+       }
+       size := n.sizeInv.Invoke(mainElm, newRest)
+       output := &FullValue{}
+       output.Timestamp = elm.Timestamp
+       output.Windows = elm.Windows
+       output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+       output.Elm2 = size
+
+       if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+               return err
+       }
+       return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+       n.truncateInv.Reset()

Review Comment:
   If truncateInv is nil, this will throw (another reason the approach I 
suggested above will help :) )



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
        // GetRestriction returns the restriction this tracker is tracking, or 
nil if the restriction
        // is unavailable for some reason.
        GetRestriction() interface{}
+
+       // IsBounded returns the boundedness of the current restriction. If the 
current restriction represents a
+       // finite amount of work, it should return sdf.Bounded. Otherwise, it 
should return sdf.Unbounded.
+       IsBounded() bool

Review Comment:
   This would be a breaking change for anyone who implements the RTracker 
interface (which is also why you needed to update all existing Trackers). One 
way to make this non-breaking would be to introduce a new interface that you 
can compose with this one, something like:
   
   ```
   type UnboundableRTracker interface {
      IsBounded() bool
   }
   ```
   (UnboundableRTracker might be a bad name, feel free to come up with 
something better 😄 ). Then whenever we want to check boundedness, we can call 
something like:
   
   ```
   isRtrackerBound(tracker RTracker) {
      if uTracker, ok := tracker.(UnboundableRTracker); ok && 
uTracker.IsBounded() {
         return true
      }
      return false
   }
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil

Review Comment:
   This default is confusing to me - why do we only handle the offsetrange 
case? Shouldn't we be doing a check to see if its bounded or not and then 
either returning the original restriction or nil?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)

Review Comment:
   Putting the restriction before the element is the opposite of what was 
described in the design doc. I actually think its the right ordering though, 
since its what was done in ProcessElement (and it matches my watermark 
changes), could you update the doc though?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)
+               }
+       case reflectx.Func3x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+               }
+       default:
+               switch len(n.fn.Param) {
+               case 2:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[0] = rest
+                               n.args[1] = elms.Elm
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               case 3:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[1] = elms.Elm
+                               n.args[2] = elms.Elm2
+                               n.args[0] = rest

Review Comment:
   ```suggestion
                                n.args[0] = rest
                                n.args[1] = elms.Elm
                                n.args[2] = elms.Elm2
   ```
   
   Nit - you go in lowest to highest order above, could you do that here as 
well?



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
        // GetRestriction returns the restriction this tracker is tracking, or 
nil if the restriction
        // is unavailable for some reason.
        GetRestriction() interface{}
+
+       // IsBounded returns the boundedness of the current restriction. If the 
current restriction represents a
+       // finite amount of work, it should return sdf.Bounded. Otherwise, it 
should return sdf.Unbounded.
+       IsBounded() bool

Review Comment:
   Note - this implicitly treats all existing restrictions as bounded (which 
IMO is the right thing since we're introducing new behaviors here for unbounded 
restrictions)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
                n.args[i] = nil
        }
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+       fn   *funcx.Fn
+       args []interface{}
+       call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = 
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) 
{
+       switch restTracker.(type) {
+       case *offsetrange.Tracker:
+               return 
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) 
// since offsetrange has a bounded restriction
+       default:
+               return nil
+       }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+       n := &trInvoker{
+               fn:   fn,
+               args: make([]interface{}, len(fn.Param)),
+       }
+       if err := n.initCallFn(); err != nil {
+               return nil, errors.WithContext(err, "sdf TruncateRestriction 
invoker")
+       }
+       return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+       // Expects a signature of the form:
+       // (key?, value, restriction) []restriction
+       // TODO(BEAM-9643): Link to full documentation.
+       switch fnT := n.fn.Fn.(type) {
+       case reflectx.Func2x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call2x1(rest, elms.Elm)
+               }
+       case reflectx.Func3x1:
+               n.call = func(elms *FullValue, rest interface{}) interface{} {
+                       return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+               }
+       default:
+               switch len(n.fn.Param) {
+               case 2:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[0] = rest
+                               n.args[1] = elms.Elm
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               case 3:
+                       n.call = func(elms *FullValue, rest interface{}) 
interface{} {
+                               n.args[1] = elms.Elm
+                               n.args[2] = elms.Elm2
+                               n.args[0] = rest
+                               return n.fn.Fn.Call(n.args)[0]
+                       }
+               default:
+                       return errors.Errorf("TruncateRestriction fn %v has 
unexpected number of parameters: %v",
+                               n.fn.Fn.Name(), len(n.fn.Param))
+               }
+       }
+       return nil
+}
+
+// Invoke calls TruncateRestriction given a FullValue containing an element and
+// the associated restriction tracker, and returns a truncated restriction.
+func (n *trInvoker) Invoke(rt interface{}, elms *FullValue) (rest interface{}) 
{
+       return n.call(elms, rt)

Review Comment:
   Nit: There's not really a reason to have it defined `Invoke(rest, elms)` and 
then have the invoker's call function defined `(elms, rest)` - could you flip 
the ordering of the call function to match all the other orderings here?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762353)
    Time Spent: 1h 10m  (was: 1h)

> [Go SDK] Truncating SDFs during drain
> -------------------------------------
>
>                 Key: BEAM-11106
>                 URL: https://issues.apache.org/jira/browse/BEAM-11106
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Ritesh Ghorse
>            Priority: P3
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Allow SDFs to specify a restriction truncation method to permit faster drains.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to