[ 
https://issues.apache.org/jira/browse/BEAM-11106?focusedWorklogId=764821&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764821
 ]

ASF GitHub Bot logged work on BEAM-11106:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/May/22 01:17
            Start Date: 02/May/22 01:17
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r862550340


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,116 @@ func (n *SplitAndSizeRestrictions) String() string {
        return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", 
path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals 
to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+       UID         UnitID
+       Fn          *graph.DoFn
+       Out         Node
+       truncateInv *trInvoker
+       sizeInv     *rsInvoker
+       ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+       return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+       fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+       var err error
+       if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+               return errors.WithContextf(err, "%v", n)
+       }
+
+       fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+       if fn != nil {
+               if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err 
!= nil {
+                       return err
+               }
+       } else {
+               if n.truncateInv, err = newDefaultTruncateRestrictionInvoker(); 
err != nil {
+                       return err
+               }
+       }
+       fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+       if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+               return err
+       }
+       return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, 
data DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm 
*FullValue, values ...ReStream) error {
+       mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+       // TODO: change restriction extraction to consider watermark estimator 
after BEAM-11105 is merged.

Review Comment:
   This change went in - https://github.com/apache/beam/pull/17374
   
   So it should now be:
   
   ```
   rest := elm.Elm.(*FullValue).Elm2.(*FullValue).Elm
   ```
   
   and below:
   
   ```
   output.Elm = &FullValue{Elm: mainElm, Elm2: &FullValue{newRest, 
elm.Elm.(*FullValue).Elm2.(*FullValue).Elm2}}
   ```
   
   And the input/output diagram should be updated to:
   
   ```
   //   *FullValue {
   //     Elm: *FullValue {
   //       Elm:  *FullValue (original input)
   //       Elm2: *FullValue {
   //         Elm: Restriction
   //      Elm2: Watermark estimator state
   //       }
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 764821)
    Time Spent: 3h 10m  (was: 3h)

> [Go SDK] Truncating SDFs during drain
> -------------------------------------
>
>                 Key: BEAM-11106
>                 URL: https://issues.apache.org/jira/browse/BEAM-11106
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Robert Burke
>            Assignee: Ritesh Ghorse
>            Priority: P3
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> Allow SDFs to specify a restriction truncation method to permit faster drains.
> (To be updated once [https://github.com/apache/beam/pull/13160] is merged and 
> the programming guide updated with SDF content.)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to