youngoli commented on a change in pull request #11791:
URL: https://github.com/apache/beam/pull/11791#discussion_r431615426
##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -266,33 +267,85 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
return ProgressReportSnapshot{PID: n.outputPID, ID: n.SID.PtransformID,
Name: n.Name, Count: c}
}
-// Split takes a sorted set of potential split indices, selects and actuates
-// split on an appropriate split index, and returns the selected split index
-// if successful. Returns an error when unable to split.
+// Split takes a sorted set of potential split indices and a fraction of the
+// remainder to split at, selects and actuates a split on an appropriate split
+// index, and returns the selected split index if successful. Returns an error
+// when unable to split.
func (n *DataSource) Split(splits []int64, frac float64) (int64, error) {
- if splits == nil {
- return 0, fmt.Errorf("failed to split: requested splits were
empty")
- }
if n == nil {
return 0, fmt.Errorf("failed to split at requested splits:
{%v}, DataSource not initialized", splits)
}
+ if frac > 1.0 {
+ frac = 1.0
+ } else if frac < 0.0 {
+ frac = 0.0
+ }
+
n.mu.Lock()
- c := n.index
- // Find the smallest split index that we haven't yet processed, and set
- // the promised split index to this value.
- for _, s := range splits {
- // // Never split on the first element, or the current element.
- if s > 0 && s > c && s <= n.splitIdx {
- n.splitIdx = s
- fs := n.splitIdx
- n.mu.Unlock()
- return fs, nil
- }
+ s, err := splitHelper(n.index, n.splitIdx, splits, frac)
+ if err != nil {
+ n.mu.Unlock()
+ return 0, err
}
+ n.splitIdx = s
+ fs := n.splitIdx
n.mu.Unlock()
- // If we can't find a suitable split index from the requested choices,
- // return an error.
- return 0, fmt.Errorf("failed to split at requested splits: {%v},
DataSource at index: %v", splits, c)
+ return fs, nil
+}
+
+// splitHelper is a helper function that finds a split point in a range.
+// currIdx and splitIdx should match the DataSource's index and splitIdx
fields,
+// and represent the start and end of the splittable range respectively. splits
+// is an optional slice of valid split indices, and if nil then all indices are
+// considered valid split points. frac must be between [0, 1], and represents
+// a fraction of the remaining work that the split point aims to be as close
+// as possible to.
+func splitHelper(currIdx, splitIdx int64, splits []int64, frac float64)
(int64, error) {
+ // Get split index from fraction. Find the closest index to the
fraction of
+ // the remainder.
+ var start int64 = 0
+ if currIdx > start {
+ start = currIdx
+ }
+ // This is the first valid split index, since we should never split at
0 or
+ // at the current element.
+ safeStart := start + 1
+ // The remainder starts at our actual progress (i.e. start), but our
final
+ // split index has to be >= our safeStart.
+ fracIdx := start + int64(math.Round(frac*float64(splitIdx-start)))
+ if fracIdx < safeStart {
+ fracIdx = safeStart
+ }
+ if splits == nil {
+ // All split points are valid so just split at fraction.
+ return fracIdx, nil
+ } else {
+ // Find the closest unprocessed split point to our fraction.
+ sort.Slice(splits, func(i, j int) bool { return splits[i] <
splits[j] })
+ var prevDiff int64 = math.MaxInt64
+ var bestS int64 = -1
+ for _, s := range splits {
+ if s >= safeStart && s <= splitIdx {
+ diff := intAbs(fracIdx - s)
+ if diff <= prevDiff {
+ prevDiff = diff
+ bestS = s
+ } else {
+ break // Stop early if the difference
starts increasing.
+ }
+ }
+ }
+ if bestS != -1 {
+ return bestS, nil
+ }
+ }
+ return 0, fmt.Errorf("failed to split at requested splits: {%v},
DataSource at index: %v", splits, currIdx)
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]