[
https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=774709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774709
]
ASF GitHub Bot logged work on BEAM-14511:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 25/May/22 17:29
Start Date: 25/May/22 17:29
Worklog Time Spent: 10m
Work Description: damccorm commented on code in PR #17754:
URL: https://github.com/apache/beam/pull/17754#discussion_r881913985
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
Review Comment:
I'm a little confused by this condition - if `tracker.Start() ==
tracker.End()`, doesn't that mean we're done (or there's an error I guess)?
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
Review Comment:
```suggestion
if tracker.attempted == math.MaxInt64 {
```
If htis evaluates to true, the first condition must as well
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
Review Comment:
```suggestion
log.Infof(context.Background(), "Tracker is no longer growable;
doing the normal OffsetTracker TrySplit(%f)", fraction)
```
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
+
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ log.Infof(context.Background(), "Split using estimatedEnd,
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ if splitPt < tracker.rest.Start {
Review Comment:
Is this possible?
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
Review Comment:
Should we just shell out to `tracker.IsBounded()`?
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
Review Comment:
Also, if tracker.End() is math.MaxInt64, doesn't that mean that this tracker
is **not** growable anymore?
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
+
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ log.Infof(context.Background(), "Split using estimatedEnd,
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ if splitPt < tracker.rest.Start {
+ return tracker.rest, nil, nil
+ }
+ residual = Restriction{Start: splitPt, End: tracker.End()}
+ tracker.rest.End = splitPt
+ return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+ log.Infof(context.Background(), "PROGRESS: tracker: %#v", tracker)
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.End() == tracker.Start() {
+ return tracker.Tracker.GetProgress()
+ }
+
+ if tracker.attempted == -1 {
Review Comment:
Does special casing this actually get us anything? It should return the same
results without this if, right?
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
Review Comment:
```suggestion
cur := math.Max(tracker.attempted, tracker.Start() - 1)
```
Double check me, but I think this is valid and significantly simplifies
things (and takes away some of -1 being a magic number)
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
+
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ log.Infof(context.Background(), "Split using estimatedEnd,
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ if splitPt < tracker.rest.Start {
+ return tracker.rest, nil, nil
+ }
+ residual = Restriction{Start: splitPt, End: tracker.End()}
+ tracker.rest.End = splitPt
+ return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+ log.Infof(context.Background(), "PROGRESS: tracker: %#v", tracker)
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.End() == tracker.Start() {
+ return tracker.Tracker.GetProgress()
+ }
+
+ if tracker.attempted == -1 {
+ done = 0
+ remaining =
math.Max(float64(tracker.End())-float64(tracker.Start()), 0)
+ return done, remaining
+ }
+
+ done = float64((tracker.claimed + 1) - tracker.Start())
+ remaining = float64(tracker.rest.End - (tracker.claimed + 1))
Review Comment:
Instead of `tracker.rest.End`, should we be using the estimated end? I'm not
sure how this is supposed to behave, but that would make sense to me.
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
+
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ log.Infof(context.Background(), "Split using estimatedEnd,
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ if splitPt < tracker.rest.Start {
+ return tracker.rest, nil, nil
+ }
+ residual = Restriction{Start: splitPt, End: tracker.End()}
+ tracker.rest.End = splitPt
+ return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+ log.Infof(context.Background(), "PROGRESS: tracker: %#v", tracker)
+
+ // If current tracking range is no longer growable, split it as a
normal range.
Review Comment:
```suggestion
// If current tracking range is no longer growable, get progress as if
a normal range.
```
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() or
GetProgress() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be
provided.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ log.Infof(context.Background(), "Done in TrySplit(%f)",
fraction)
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+ log.Infof(context.Background(), "Doing the normal OffsetTracker
TrySplit(%f)", fraction)
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ var cur int64
+ if tracker.attempted != -1 {
+ cur = tracker.attempted
+ } else {
+ cur = tracker.Start() - 1
+ }
+
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ log.Infof(context.Background(), "Split using estimatedEnd,
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ if splitPt < tracker.rest.Start {
+ return tracker.rest, nil, nil
+ }
+ residual = Restriction{Start: splitPt, End: tracker.End()}
+ tracker.rest.End = splitPt
+ return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+ log.Infof(context.Background(), "PROGRESS: tracker: %#v", tracker)
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 || tracker.End() == tracker.Start() {
Review Comment:
Same questions as above about these conditions
Issue Time Tracking
-------------------
Worklog Id: (was: 774709)
Time Spent: 2h 10m (was: 2h)
> Implement Growable Tracker for Go SDK
> -------------------------------------
>
> Key: BEAM-14511
> URL: https://issues.apache.org/jira/browse/BEAM-14511
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Ritesh Ghorse
> Assignee: Ritesh Ghorse
> Priority: P2
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Add a growable tracker for
> [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60]
> Restriction in Go SDK. This would be useful for strreaming/unbounded
> restrictions in SDF.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)