lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r459721826



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
        }
 
        n.mu.Lock()
+       defer n.mu.Unlock()
+
        var currProg float64 // Current element progress.
-       if n.index < 0 {     // Progress is at the end of the non-existant -1st 
element.
+       var su SplittableUnit
+       if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
                currProg = 1.0
-       } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+       } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
                currProg = 0.5
        } else { // If this is sub-element splittable, get progress of the 
current element.
-               rt := <-n.rt
-               d, r := rt.GetProgress()
-               currProg = d / (d + r)
-               n.rt <- rt
+               // If splittable, hold this tracker for the rest of the 
function so the element
+               // doesn't finish processing during a split.
+               su = <-n.su
+               if su == nil {
+                       return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+               }
+               defer func() {
+                       n.su <- su
+               }()
+               currProg = su.GetProgress()
        }
        // Size to split within is the minimum of bufSize or splitIdx so we 
avoid
        // including elements we already know won't be processed.
        if bufSize <= 0 || n.splitIdx < bufSize {
                bufSize = n.splitIdx
        }
-       s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+       s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
        if err != nil {
-               n.mu.Unlock()
-               return 0, err
+               return SplitResult{}, err
+       }
+       if f > 0.0 {

Review comment:
       Consider inverting the if statement.  There's only 2 lines after this if 
statement, and 30 inside it. While it might mean the s.splitIdx assignment, and 
the SplitResult construction areduplicated, it will be easier to read the other 
way.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
        }
 
        n.mu.Lock()
+       defer n.mu.Unlock()
+
        var currProg float64 // Current element progress.
-       if n.index < 0 {     // Progress is at the end of the non-existant -1st 
element.
+       var su SplittableUnit
+       if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
                currProg = 1.0
-       } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+       } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
                currProg = 0.5
        } else { // If this is sub-element splittable, get progress of the 
current element.
-               rt := <-n.rt
-               d, r := rt.GetProgress()
-               currProg = d / (d + r)
-               n.rt <- rt
+               // If splittable, hold this tracker for the rest of the 
function so the element
+               // doesn't finish processing during a split.
+               su = <-n.su
+               if su == nil {
+                       return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+               }
+               defer func() {
+                       n.su <- su
+               }()
+               currProg = su.GetProgress()
        }
        // Size to split within is the minimum of bufSize or splitIdx so we 
avoid
        // including elements we already know won't be processed.
        if bufSize <= 0 || n.splitIdx < bufSize {
                bufSize = n.splitIdx
        }
-       s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+       s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
        if err != nil {
-               n.mu.Unlock()
-               return 0, err
+               return SplitResult{}, err
+       }
+       if f > 0.0 {
+               fr := f / (1.0 - currProg)
+               p, r, err := su.Split(fr)
+               if err != nil {
+                       return SplitResult{}, err
+               }
+
+               if p != nil && r != nil { // Successful split.
+                       pEnc, err := encodeElm(p, n.Coder)
+                       if err != nil {
+                               return SplitResult{}, err
+                       }
+                       rEnc, err := encodeElm(r, n.Coder)

Review comment:
       Constructing the encoders every split request is expensive, so we're 
best off creating those ahead of time, even if they aren't necessarily going to 
be used every time. Unlike on the main ProcessElement path, we can't just keep 
the encoders around in a function scope, so we'll have to just add it to the 
Source. But we can just construct them on demand at least, which is much 
cheaper than repeatedly constructing them.
   
   At the very least, ideally we reuse them between the Primary and the 
Residual. It's likely just noise for the repeated constructions for Successful 
splits, but we might as well do half as much work.
   
   In short, we should wait until we see this coder construction in a CPU 
profile before the heavier caching is applied, but we definitely should avoid 
the double work here.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/translate.go
##########
@@ -418,7 +418,8 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                        }
                                        u = n
                                        if urn == 
urnProcessSizedElementsAndRestrictions {
-                                               u = 
&ProcessSizedElementsAndRestrictions{PDo: n}
+                                               transform.GetInputs()

Review comment:
       Copy Pasta?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -325,6 +337,63 @@ func (n *ProcessSizedElementsAndRestrictions) String() 
string {
        return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] UID:%v 
Out:%v", path.Base(n.PDo.Fn.Name()), n.PDo.ID(), IDs(n.PDo.Out...))
 }
 
+type SplittableUnit interface {
+       Split(fraction float64) (primary, residual *FullValue, err error)
+       GetProgress() float64
+       GetTransformId() string
+       GetMainInputId() string
+}
+
+func (n *ProcessSizedElementsAndRestrictions) Split(f float64) (*FullValue, 
*FullValue, error) {
+       if n.rt == nil {
+               err := errors.New("Restriction tracker missing.")
+               return nil, nil, errors.WithContext(err, "Attempting split in 
ProcessSizedElementsAndRestrictions")
+       }
+       p, r, err := n.rt.TrySplit(f)
+       if err != nil {
+               return nil, nil, errors.WithContext(err, "Attempting split in 
ProcessSizedElementsAndRestrictions")
+       }
+
+       var pfv, rfv *FullValue
+       if r != nil { // If r is nil then the split failed/returned an empty 
residual.

Review comment:
       Here's another opportunity to invert the statement for clarity. 
   
   eg.
   
   if r == nil {
     return nil, nil, nil
   }
   
   rest of the function.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to