[
https://issues.apache.org/jira/browse/BEAM-11106?focusedWorklogId=762390&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762390
]
ASF GitHub Bot logged work on BEAM-11106:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Apr/22 15:22
Start Date: 26/Apr/22 15:22
Worklog Time Spent: 10m
Work Description: riteshghorse commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858853893
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v",
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+ UID UnitID
+ Fn *graph.DoFn
+ Out Node
+ truncateInv *trInvoker
+ sizeInv *rsInvoker
+ ctInv *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+ return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+ fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+ var err error
+ if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+ return errors.WithContextf(err, "%v", n)
+ }
+
+ fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+ if fn != nil {
+ if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err
!= nil {
+ return err
+ }
+ }
+ fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+ if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+ return err
+ }
+ return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string,
data DataContext) error {
+ return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+//
+// Output Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm
*FullValue, values ...ReStream) error {
+ mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ // TODO: change restriction extraction to consider watermark estimator
after BEAM-11105 is merged.
+ rest := elm.Elm.(*FullValue).Elm2
+ rt := n.ctInv.Invoke(rest)
+ var err error
+ var newRest interface{}
+ if n.truncateInv == nil {
+ newRest = DefaultTruncateRestriction(rt)
Review Comment:
Good one. I thought of doing this earlier but setting `n.fn` was a problem
so didn't pursue it further. But I guess I can get away with just setting
`n.call` to default one in this case.
##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v",
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
}
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+ UID UnitID
+ Fn *graph.DoFn
+ Out Node
+ truncateInv *trInvoker
+ sizeInv *rsInvoker
+ ctInv *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+ return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+ fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+ var err error
+ if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+ return errors.WithContextf(err, "%v", n)
+ }
+
+ fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+ if fn != nil {
+ if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err
!= nil {
+ return err
+ }
+ }
+ fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+ if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+ return err
+ }
+ return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string,
data DataContext) error {
+ return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+//
+// Output Diagram:
+// *FullValue {
+// Elm: *FullValue {
+// Elm: *FullValue (original input)
+// Elm2: Restriction
+// }
+// Elm2: float64 (size)
+// Windows
+// Timestamps
+// }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm
*FullValue, values ...ReStream) error {
+ mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+ // TODO: change restriction extraction to consider watermark estimator
after BEAM-11105 is merged.
+ rest := elm.Elm.(*FullValue).Elm2
+ rt := n.ctInv.Invoke(rest)
+ var err error
+ var newRest interface{}
+ if n.truncateInv == nil {
+ newRest = DefaultTruncateRestriction(rt)
+ } else {
+ newRest = n.truncateInv.Invoke(rt, mainElm)
+ }
+ size := n.sizeInv.Invoke(mainElm, newRest)
+ output := &FullValue{}
+ output.Timestamp = elm.Timestamp
+ output.Windows = elm.Windows
+ output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+ output.Elm2 = size
+
+ if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+ return err
+ }
+ return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+ n.truncateInv.Reset()
Review Comment:
yes, thanks!
Issue Time Tracking
-------------------
Worklog Id: (was: 762390)
Time Spent: 1.5h (was: 1h 20m)
> [Go SDK] Truncating SDFs during drain
> -------------------------------------
>
> Key: BEAM-11106
> URL: https://issues.apache.org/jira/browse/BEAM-11106
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-go
> Reporter: Robert Burke
> Assignee: Ritesh Ghorse
> Priority: P3
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Allow SDFs to specify a restriction truncation method to permit faster drains.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and
> the programming guide updated with SDF content.)
--
This message was sent by Atlassian Jira
(v8.20.7#820007)