riteshghorse commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858853893


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)

Review Comment:
   Good one. I thought of doing this earlier but setting `n.fn` was a problem 
so didn't pursue it further. But I guess I can get away with just setting 
`n.call` to default one in this case.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.
+       rest := elm.Elm.(*FullValue).Elm2
+       rt := n.ctInv.Invoke(rest)
+       var err error
+       var newRest interface{}
+       if n.truncateInv == nil {
+               newRest = DefaultTruncateRestriction(rt)
+       } else {
+               newRest = n.truncateInv.Invoke(rt, mainElm)
+       }
+       size := n.sizeInv.Invoke(mainElm, newRest)
+       output := &FullValue{}
+       output.Timestamp = elm.Timestamp
+       output.Windows = elm.Windows
+       output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+       output.Elm2 = size
+
+       if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+               return err
+       }
+       return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+       n.truncateInv.Reset()

Review Comment:
   yes, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to