[ 
https://issues.apache.org/jira/browse/BEAM-9642?focusedWorklogId=432559&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-432559
 ]

ASF GitHub Bot logged work on BEAM-9642:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/20 19:28
            Start Date: 10/May/20 19:28
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #11645:
URL: https://github.com/apache/beam/pull/11645#discussion_r422682835



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx 
context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
        return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] 
Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes 
all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+       PDo *ParDo
+
+       initRestInv *cirInvoker
+       splitInv    *srInvoker
+       trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+       return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+       sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+       addContext := func(err error) error {
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())

Review comment:
       Consider just using the auto String() print to add as context instead of 
adding the extra words. eg. (err, "%v", n)

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx 
context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
        return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] 
Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes 
all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+       PDo *ParDo
+
+       initRestInv *cirInvoker
+       splitInv    *srInvoker
+       trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+       return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+       sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+       addContext := func(err error) error {
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())
+       }
+       var err error
+       if n.initRestInv, err = 
newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != 
nil {
+               return addContext(err)
+       }
+       if n.splitInv, err = 
newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+               return addContext(err)
+       }
+       if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); 
err != nil {
+               return addContext(err)
+       }
+       return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.

Review comment:
       Editorial nits: 
   We can probably remove the "just".
   Given "defer" is a keyword in Go, using defers might be misinterpreted. 
Consider replacing it with "calls"
   
   ```suggestion
   // StartBundle calls the ParDo's StartBundle method.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx 
context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
        return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] 
Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes 
all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+       PDo *ParDo
+
+       initRestInv *cirInvoker
+       splitInv    *srInvoker
+       trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+       return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+       sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+       addContext := func(err error) error {
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())
+       }
+       var err error
+       if n.initRestInv, err = 
newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != 
nil {
+               return addContext(err)
+       }
+       if n.splitInv, err = 
newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+               return addContext(err)
+       }
+       if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); 
err != nil {
+               return addContext(err)
+       }
+       return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.
+func (n *SdfFallback) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       return n.PDo.StartBundle(ctx, id, data)
+}
+
+// ProcessElement performs all the work from the steps above in one transform.
+// This means creating initial restrictions, performing initial splits on those
+// restrictions, and then creating restriction trackers and processing each
+// restriction with the underlying ParDo. This executor skips the sizing step
+// because sizing information is unnecessary for unexpanded SDFs.
+func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
+       if n.PDo.status != Active {
+               return errors.Errorf("invalid status for ParDo %v: %v, want 
Active", n.PDo.UID, n.PDo.status)
+       }
+
+       rest := n.initRestInv.Invoke(elm)
+       splitRests := n.splitInv.Invoke(elm, rest)
+       if len(splitRests) == 0 {
+               err := errors.Errorf("initial splitting returned 0 
restrictions.")
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())

Review comment:
       Same comment here WRT context.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx 
context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
        return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] 
Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes 
all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+       PDo *ParDo
+
+       initRestInv *cirInvoker
+       splitInv    *srInvoker
+       trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+       return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+       sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+       addContext := func(err error) error {
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())
+       }
+       var err error
+       if n.initRestInv, err = 
newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != 
nil {
+               return addContext(err)
+       }
+       if n.splitInv, err = 
newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+               return addContext(err)
+       }
+       if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); 
err != nil {
+               return addContext(err)
+       }
+       return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.
+func (n *SdfFallback) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       return n.PDo.StartBundle(ctx, id, data)
+}
+
+// ProcessElement performs all the work from the steps above in one transform.
+// This means creating initial restrictions, performing initial splits on those
+// restrictions, and then creating restriction trackers and processing each
+// restriction with the underlying ParDo. This executor skips the sizing step
+// because sizing information is unnecessary for unexpanded SDFs.
+func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, 
values ...ReStream) error {
+       if n.PDo.status != Active {
+               return errors.Errorf("invalid status for ParDo %v: %v, want 
Active", n.PDo.UID, n.PDo.status)
+       }
+
+       rest := n.initRestInv.Invoke(elm)
+       splitRests := n.splitInv.Invoke(elm, rest)
+       if len(splitRests) == 0 {
+               err := errors.Errorf("initial splitting returned 0 
restrictions.")
+               return errors.WithContextf(err, "SdfFallback transform with UID 
%v", n.ID())
+       }
+
+       for _, splitRest := range splitRests {
+               rt := n.trackerInv.Invoke(splitRest)
+               mainIn := &MainInput{
+                       Key:      *elm,
+                       Values:   values,
+                       RTracker: rt,
+               }
+               if err := n.PDo.processMainInput(mainIn); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+// FinishBundle does some teardown for the end of the bundle and then defers to
+// the ParDo's FinishBundle method.

Review comment:
       ```suggestion
   // FinishBundle resets the invokers and then calls the ParDo's FinishBundle 
method.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 432559)
    Time Spent: 6h 10m  (was: 6h)

> Add SDF execution-time runners
> ------------------------------
>
>                 Key: BEAM-9642
>                 URL: https://issues.apache.org/jira/browse/BEAM-9642
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Adds execution-time SDF runner units to the exec package, and any unit tests 
> + helpers required.
> This is needed to get the expanded SDF URNs to execute in the runner harness.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to