[
https://issues.apache.org/jira/browse/BEAM-11106?focusedWorklogId=762353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762353
]
ASF GitHub Bot logged work on BEAM-11106:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 14:36
Start Date: 26/Apr/22 14:36
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858778358
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v",
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+ UID UnitID
+ Fn *graph.DoFn
+ Out Node
+ truncateInv *trInvoker
+ sizeInv *rsInvoker
+ ctInv *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+ return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+ fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+ var err error
+ if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+ return errors.WithContextf(err, "%v", n)
+ }
+
+ fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+ if fn != nil {
+ if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err
!= nil {
+ return err
+ }
+ }
+ fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+ if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+ return err
+ }
+ return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string,
data DataContext) error {
+ return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+//
+// Output Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm
*FullValue, values ...ReStream) error {
+ mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ // TODO: change restriction extraction to consider watermark estimator
after BEAM-11105 is merged.
+ rest := elm.Elm.(*FullValue).Elm2
+ rt := n.ctInv.Invoke(rest)
+ var err error
+ var newRest interface{}
+ if n.truncateInv == nil {
+ newRest = DefaultTruncateRestriction(rt)
Review Comment:
Rather than having this nil check, could we define `n.truncateInv` to
default to `DefaultTruncateRestriction` in `Up`? That way we're spending less
time on it on the hotter ProcessElement path and it should simplify some of our
logic here.
##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -308,6 +314,17 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
return f.CreateInitialRestrictionFn().Ret[0].T
}
+// HasTruncateRestriction returns whether the DoFn implements a custom
truncate restriction function.
+func (f *SplittableDoFn) HasTruncateRestriction() bool {
+ _, ok := f.methods[truncateRestrictionName]
+ return ok
+}
+
+// TruncateRestrictionFn returns the "TruncateRestriction" function, if
present.
+func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn {
+ return f.methods[truncateRestrictionName]
+}
+
Review Comment:
We should also add validation code (and corresponding tests) to make sure
that the supplied TruncateRestriction function is valid (similar to
https://github.com/apache/beam/blob/15a064433a363f4c5443b55a43fc29dff836872c/sdks/go/pkg/beam/core/graph/fn.go#L526).
This would include validating that the input element matches the input element
type used in ProcessElement and that it has the correct number/type of
parameters
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v",
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+ UID UnitID
+ Fn *graph.DoFn
+ Out Node
+ truncateInv *trInvoker
+ sizeInv *rsInvoker
+ ctInv *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+ return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+ fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+ var err error
+ if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+ return errors.WithContextf(err, "%v", n)
+ }
+
+ fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+ if fn != nil {
+ if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err
!= nil {
+ return err
+ }
+ }
+ fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+ if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+ return err
+ }
+ return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string,
data DataContext) error {
+ return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+//
+// Output Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm
*FullValue, values ...ReStream) error {
+ mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ // TODO: change restriction extraction to consider watermark estimator
after BEAM-11105 is merged.
+ rest := elm.Elm.(*FullValue).Elm2
+ rt := n.ctInv.Invoke(rest)
+ var err error
+ var newRest interface{}
+ if n.truncateInv == nil {
+ newRest = DefaultTruncateRestriction(rt)
+ } else {
+ newRest = n.truncateInv.Invoke(rt, mainElm)
+ }
+ size := n.sizeInv.Invoke(mainElm, newRest)
+ output := &FullValue{}
+ output.Timestamp = elm.Timestamp
+ output.Windows = elm.Windows
+ output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+ output.Elm2 = size
+
+ if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+ return err
+ }
+ return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+ n.truncateInv.Reset()
Review Comment:
We need to reset `n.ctInv` here as well
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v",
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+ UID UnitID
+ Fn *graph.DoFn
+ Out Node
+ truncateInv *trInvoker
+ sizeInv *rsInvoker
+ ctInv *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+ return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+ fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+ var err error
+ if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+ return errors.WithContextf(err, "%v", n)
+ }
+
+ fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+ if fn != nil {
+ if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err
!= nil {
+ return err
+ }
+ }
+ fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+ if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+ return err
+ }
+ return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string,
data DataContext) error {
+ return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+//
+// Output Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm
*FullValue, values ...ReStream) error {
+ mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ // TODO: change restriction extraction to consider watermark estimator
after BEAM-11105 is merged.
+ rest := elm.Elm.(*FullValue).Elm2
+ rt := n.ctInv.Invoke(rest)
+ var err error
+ var newRest interface{}
+ if n.truncateInv == nil {
+ newRest = DefaultTruncateRestriction(rt)
+ } else {
+ newRest = n.truncateInv.Invoke(rt, mainElm)
+ }
+ size := n.sizeInv.Invoke(mainElm, newRest)
+ output := &FullValue{}
+ output.Timestamp = elm.Timestamp
+ output.Windows = elm.Windows
+ output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+ output.Elm2 = size
+
+ if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+ return err
+ }
+ return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+ n.truncateInv.Reset()
Review Comment:
If truncateInv is nil, this will throw (another reason the approach I
suggested above will help :) )
##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
// GetRestriction returns the restriction this tracker is tracking, or
nil if the restriction
// is unavailable for some reason.
GetRestriction() interface{}
+
+ // IsBounded returns the boundedness of the current restriction. If the
current restriction represents a
+ // finite amount of work, it should return sdf.Bounded. Otherwise, it
should return sdf.Unbounded.
+ IsBounded() bool
Review Comment:
This would be a breaking change for anyone who implements the RTracker
interface (which is also why you needed to update all existing Trackers). One
way to make this non-breaking would be to introduce a new interface that you
can compose with this one, something like:
```
type UnboundableRTracker interface {
IsBounded() bool
}
```
(UnboundableRTracker might be a bad name, feel free to come up with
something better 😄 ). Then whenever we want to check boundedness, we can call
something like:
```
isRtrackerBound(tracker RTracker) {
if uTracker, ok := tracker.(UnboundableRTracker); ok &&
uTracker.IsBounded() {
return true
}
return false
}
```
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker =
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
+ switch restTracker.(type) {
+ case *offsetrange.Tracker:
+ return
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
// since offsetrange has a bounded restriction
+ default:
+ return nil
Review Comment:
This default is confusing to me - why do we only handle the offsetrange
case? Shouldn't we be doing a check to see if its bounded or not and then
either returning the original restriction or nil?
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker =
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
+ switch restTracker.(type) {
+ case *offsetrange.Tracker:
+ return
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
// since offsetrange has a bounded restriction
+ default:
+ return nil
+ }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+ n := &trInvoker{
+ fn: fn,
+ args: make([]interface{}, len(fn.Param)),
+ }
+ if err := n.initCallFn(); err != nil {
+ return nil, errors.WithContext(err, "sdf TruncateRestriction
invoker")
+ }
+ return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+ // Expects a signature of the form:
+ // (key?, value, restriction) []restriction
+ // TODO(BEAM-9643): Link to full documentation.
+ switch fnT := n.fn.Fn.(type) {
+ case reflectx.Func2x1:
+ n.call = func(elms *FullValue, rest interface{}) interface{} {
+ return fnT.Call2x1(rest, elms.Elm)
Review Comment:
Putting the restriction before the element is the opposite of what was
described in the design doc. I actually think its the right ordering though,
since its what was done in ProcessElement (and it matches my watermark
changes), could you update the doc though?
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker =
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
+ switch restTracker.(type) {
+ case *offsetrange.Tracker:
+ return
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
// since offsetrange has a bounded restriction
+ default:
+ return nil
+ }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+ n := &trInvoker{
+ fn: fn,
+ args: make([]interface{}, len(fn.Param)),
+ }
+ if err := n.initCallFn(); err != nil {
+ return nil, errors.WithContext(err, "sdf TruncateRestriction
invoker")
+ }
+ return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+ // Expects a signature of the form:
+ // (key?, value, restriction) []restriction
+ // TODO(BEAM-9643): Link to full documentation.
+ switch fnT := n.fn.Fn.(type) {
+ case reflectx.Func2x1:
+ n.call = func(elms *FullValue, rest interface{}) interface{} {
+ return fnT.Call2x1(rest, elms.Elm)
+ }
+ case reflectx.Func3x1:
+ n.call = func(elms *FullValue, rest interface{}) interface{} {
+ return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+ }
+ default:
+ switch len(n.fn.Param) {
+ case 2:
+ n.call = func(elms *FullValue, rest interface{})
interface{} {
+ n.args[0] = rest
+ n.args[1] = elms.Elm
+ return n.fn.Fn.Call(n.args)[0]
+ }
+ case 3:
+ n.call = func(elms *FullValue, rest interface{})
interface{} {
+ n.args[1] = elms.Elm
+ n.args[2] = elms.Elm2
+ n.args[0] = rest
Review Comment:
```suggestion
n.args[0] = rest
n.args[1] = elms.Elm
n.args[2] = elms.Elm2
```
Nit - you go in lowest to highest order above, could you do that here as
well?
##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
// GetRestriction returns the restriction this tracker is tracking, or
nil if the restriction
// is unavailable for some reason.
GetRestriction() interface{}
+
+ // IsBounded returns the boundedness of the current restriction. If the
current restriction represents a
+ // finite amount of work, it should return sdf.Bounded. Otherwise, it
should return sdf.Unbounded.
+ IsBounded() bool
Review Comment:
Note - this implicitly treats all existing restrictions as bounded (which
IMO is the right thing since we're introducing new behaviors here for unbounded
restrictions)
##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
n.args[i] = nil
}
}
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+ fn *funcx.Fn
+ args []interface{}
+ call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker =
reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{})
{
+ switch restTracker.(type) {
+ case *offsetrange.Tracker:
+ return
restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction)
// since offsetrange has a bounded restriction
+ default:
+ return nil
+ }
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+ n := &trInvoker{
+ fn: fn,
+ args: make([]interface{}, len(fn.Param)),
+ }
+ if err := n.initCallFn(); err != nil {
+ return nil, errors.WithContext(err, "sdf TruncateRestriction
invoker")
+ }
+ return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+ // Expects a signature of the form:
+ // (key?, value, restriction) []restriction
+ // TODO(BEAM-9643): Link to full documentation.
+ switch fnT := n.fn.Fn.(type) {
+ case reflectx.Func2x1:
+ n.call = func(elms *FullValue, rest interface{}) interface{} {
+ return fnT.Call2x1(rest, elms.Elm)
+ }
+ case reflectx.Func3x1:
+ n.call = func(elms *FullValue, rest interface{}) interface{} {
+ return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+ }
+ default:
+ switch len(n.fn.Param) {
+ case 2:
+ n.call = func(elms *FullValue, rest interface{})
interface{} {
+ n.args[0] = rest
+ n.args[1] = elms.Elm
+ return n.fn.Fn.Call(n.args)[0]
+ }
+ case 3:
+ n.call = func(elms *FullValue, rest interface{})
interface{} {
+ n.args[1] = elms.Elm
+ n.args[2] = elms.Elm2
+ n.args[0] = rest
+ return n.fn.Fn.Call(n.args)[0]
+ }
+ default:
+ return errors.Errorf("TruncateRestriction fn %v has
unexpected number of parameters: %v",
+ n.fn.Fn.Name(), len(n.fn.Param))
+ }
+ }
+ return nil
+}
+
+// Invoke calls TruncateRestriction given a FullValue containing an element and
+// the associated restriction tracker, and returns a truncated restriction.
+func (n *trInvoker) Invoke(rt interface{}, elms *FullValue) (rest interface{})
{
+ return n.call(elms, rt)
Review Comment:
Nit: There's not really a reason to have it defined `Invoke(rest, elms)` and
then have the invoker's call function defined `(elms, rest)` - could you flip
the ordering of the call function to match all the other orderings here?
Issue Time Tracking
-------------------
Worklog Id: (was: 762353)
Time Spent: 1h 10m (was: 1h)
> [Go SDK] Truncating SDFs during drain
> -------------------------------------
>
> Key: BEAM-11106
> URL: https://issues.apache.org/jira/browse/BEAM-11106
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Ritesh Ghorse
> Priority: P3
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> Allow SDFs to specify a restriction truncation method to permit faster drains.
> (To be updated once [https://github.com/apache/beam/pull/13160]Â is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)