damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r862550340


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,116 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       } else {
+               if n.truncateInv, err = newDefaultTruncateRestrictionInvoker(); 
err != nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.

Review Comment:
   This change went in - https://github.com/apache/beam/pull/17374
   
   So it should now be:
   
   ```
   rest := elm.Elm.(*FullValue).Elm2.(*FullValue).Elm
   ```
   
   and below:
   
   ```
   output.Elm = &FullValue{Elm: mainElm, Elm2: &FullValue{newRest, 
elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
   ```
   
   And the input/output diagram should be updated to:
   
   ```
   //   *FullValue {
   //     Elm: *FullValue {
   //       Elm:  *FullValue (original input)
   //       Elm2: *FullValue {
   //         Elm: Restriction
   //      Elm2: Watermark estimator state
   //       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to