[ 
https://issues.apache.org/jira/browse/BEAM-9935?focusedWorklogId=437896&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-437896
 ]

ASF GitHub Bot logged work on BEAM-9935:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/May/20 16:28
            Start Date: 27/May/20 16:28
    Worklog Time Spent: 10m 
      Work Description: lostluck commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431277688



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID, 
Name: n.Name, Count: c}
 }
 
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
 func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
-       if splits == nil {
-               return 0, fmt.Errorf("failed to split: requested splits were 
empty")
-       }
        if n == nil {
                return 0, fmt.Errorf("failed to split at requested splits: 
{%v}, DataSource not initialized", splits)
        }
+       if frac > 1.0 {
+               frac = 1.0
+       } else if frac < 0.0 {
+               frac = 0.0
+       }
+
        n.mu.Lock()
-       c := n.index
-       // Find the smallest split index that we haven't yet processed, and set
-       // the promised split index to this value.
-       for _, s := range splits {
-               // // Never split on the first element, or the current element.
-               if s > 0 && s > c && s <= n.splitIdx {
-                       n.splitIdx = s
-                       fs := n.splitIdx
-                       n.mu.Unlock()
-                       return fs, nil
-               }
+       s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+       if err != nil {
+               n.mu.Unlock()
+               return 0, err
        }
+       n.splitIdx = s
+       fs := n.splitIdx
        n.mu.Unlock()
-       // If we can't find a suitable split index from the requested choices,
-       // return an error.
-       return 0, fmt.Errorf("failed to split at requested splits: {%v}, 
DataSource at index: %v", splits, c)
+       return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx 
fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64) 
(int64, error) {
+       // Get split index from fraction. Find the closest index to the 
fraction of
+       // the remainder.
+       var start int64 = 0
+       if currIdx > start {
+               start = currIdx
+       }
+       // This is the first valid split index, since we should never split at 
0 or
+       // at the current element.
+       safeStart := start + 1
+       // The remainder starts at our actual progress (i.e. start), but our 
final
+       // split index has to be >= our safeStart.
+       fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+       if fracIdx < safeStart {
+               fracIdx = safeStart
+       }
+       if splits == nil {
+               // All split points are valid so just split at fraction.
+               return fracIdx, nil
+       } else {
+               // Find the closest unprocessed split point to our fraction.
+               sort.Slice(splits, func(i, j int) bool { return splits[i] < 
splits[j] })

Review comment:
       Whoops. Good point. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 437896)
    Time Spent: 2h 20m  (was: 2h 10m)

> Resolve differences in allowed_split_point implementations
> ----------------------------------------------------------
>
>                 Key: BEAM-9935
>                 URL: https://issues.apache.org/jira/browse/BEAM-9935
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go, sdk-java-harness, sdk-py-harness
>            Reporter: Luke Cwik
>            Assignee: Daniel Oliveira
>            Priority: P2
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> [Java SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java#L223]
>  doesn't support it yet which is also safe.
> [Go SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L273]
>  only supports splits if points are specified and it doesn't use the fraction 
> at all.
> [Python SDK 
> harness|https://github.com/apache/beam/blob/d82d061aa303430f3d2853f397f3130fae6200cd/sdks/python/apache_beam/runners/worker/bundle_processor.py#L947]
>  ignores the split points meaning that it may return an invalid split 
> location based upon the runners limitations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to