[ 
https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=774723&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-774723
 ]

ASF GitHub Bot logged work on BEAM-14511:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/May/22 17:50
            Start Date: 25/May/22 17:50
    Worklog Time Spent: 10m 
      Work Description: jrmccluskey commented on code in PR #17754:
URL: https://github.com/apache/beam/pull/17754#discussion_r881944088


##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if !tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }

Review Comment:
   You should also be checking that the original tracker's `IsDone()` function 
returns `true` after a checkpoint split



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go:
##########
@@ -223,3 +229,119 @@ func (tracker *Tracker) GetRestriction() interface{} {
 func (tracker *Tracker) IsBounded() bool {
        return true
 }
+
+// RangeEndEstimator provides the estimated end offset of the range. Users 
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+       // Estimate is called to get the end offset in TrySplit() or 
GetProgress() functions.
+       //
+       // The end offset is exclusive for the range. The estimated end is not 
required to
+       // monotonically increase as it will only be taken into consideration 
when the
+       // estimated end offset is larger than the current position.
+       // Returning math.MaxInt64 as the estimate implies the largest possible 
position for the range
+       // is math.MaxInt64 - 1. Return math.MinInt64 if an estimate can not be 
provided.
+       //
+       // Providing a good estimate is important for an accurate progress 
signal and will impact
+       // splitting decisions by the runner.
+       Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be 
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that 
this tracker makes
+// no assumptions about the positions of blocks within the range, so users 
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+       Tracker
+       rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker is a constructor for an GrowableTracker given a start 
and RangeEndEstimator.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) 
(*GrowableTracker, error) {
+       if rangeEndEstimator == nil {
+               return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. 
Implementing offsetrange.RangeEndEstimator may be required")
+       }
+       return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: 
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+       return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+       return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+       if x > y {
+               return x
+       }
+       return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of 
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual 
interface{}, err error) {
+       if tracker.stopped || tracker.IsDone() {
+               log.Infof(context.Background(), "Done in TrySplit(%f)", 
fraction)
+               return tracker.rest, nil, nil
+       }
+
+       // If current tracking range is no longer growable, split it as a 
normal range.
+       if tracker.End() != math.MaxInt64 || tracker.Start() == tracker.End() {
+               log.Infof(context.Background(), "Doing the normal OffsetTracker 
TrySplit(%f)", fraction)
+               return tracker.Tracker.TrySplit(fraction)
+       }
+
+       // If current range has been done, there is no more space to split.
+       if tracker.attempted != -1 && tracker.attempted == math.MaxInt64 {
+               return nil, nil, nil
+       }
+
+       var cur int64
+       if tracker.attempted != -1 {
+               cur = tracker.attempted
+       } else {
+               cur = tracker.Start() - 1
+       }
+
+       estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+       splitPt := cur + int64(math.Ceil(math.Max(1, 
float64(estimatedEnd-cur)*(fraction))))
+       log.Infof(context.Background(), "Split using estimatedEnd, 
estimatedEnd: %v, splitPt: %v ", estimatedEnd, splitPt)
+       if splitPt > estimatedEnd {
+               return tracker.rest, nil, nil
+       }
+       if splitPt < tracker.rest.Start {

Review Comment:
   Good catch, it doesn't appear to be a possible condition given that `cur` is 
guaranteed to be at minimum Start-1 and the split point is at minimum cur + 1. 



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if !tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+// TestGrowableTracker_Split tests TrySplit method for GrowableTracker
+// in regards to returned primary and residual.
+func TestGrowableTracker_Split(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(0)) {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", false, true)
+       }
+
+       estimator.SetEstimateRangeEnd(16)
+
+       p, r, _ := tracker.TrySplit(0.5)
+       expPr := Restriction{0, 8}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes := Restriction{8, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       estimator.SetEstimateRangeEnd(12)
+       p, r, _ = tracker.TrySplit(0.5)
+       expPr = Restriction{0, 4}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0.5), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0.5), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expPr)
+       }
+       expRes = Restriction{4, 8}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0.5), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       if tracker.TryClaim(int64(4)) {
+               t.Errorf("tracker.TryClaim(4) = %v, want: %v", true, false)
+       }
+}
+
+// TestGrowableTracker_IsBounded tests IsBounded method for GrowableTracker.
+func TestGrowableTracker_IsBounded(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker is unbounded initially.")

Review Comment:
   ```suggestion
                t.Errorf("GrowableTracker is bounded, want unbounded initially")
   ```



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if !tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")

Review Comment:
   ```suggestion
                t.Fatal("tracker should not be done yet, but IsDone() returns 
true")
   ```



##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+       o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() expected to throw error.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if !tracker.TryClaim(int64(10)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if !tracker.TryClaim(int64(100)) {
+               t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+       }
+       if tracker.TryClaim(int64(math.MaxInt64)) {
+               t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v", 
true, false, tracker.err)
+       }
+       if !tracker.IsDone() {
+               t.Errorf("tracker has done all work, but IsDone() returns 
false")
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.SetEstimateRangeEnd(10)
+       p, r, _ := tracker.TrySplit(0)
+
+       expected := Restriction{0, 0}
+       if p.(Restriction) != expected {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expected)
+       }
+       if tracker.GetRestriction().(Restriction) != expected {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), 
expected)
+       }
+       res := Restriction{0, math.MaxInt64}
+       if res != r.(Restriction) {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expected)
+       }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(0)
+       p, r, _ := tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")
+       }
+
+       expPr := Restriction{0, 6}
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if tracker.GetRestriction().(Restriction) != expPr {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+       }
+       expRes := Restriction{6, math.MaxInt64}
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+
+       tracker, err = NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       if !tracker.TryClaim(int64(5)) {
+               t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+       }
+       estimator.SetEstimateRangeEnd(20)
+       p, r, _ = tracker.TrySplit(0)
+       if tracker.IsDone() {
+               t.Fatal("tracker not done yet, , but IsDone() returns true")
+       }
+       if p.(Restriction) != expPr {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", p.(Restriction), expPr)
+       }
+       if r.(Restriction) != expRes {
+               t.Errorf("wrong residual TrySplit(0), got: %v, want: %v", 
r.(Restriction), expRes)
+       }
+}
+
+// TestGrowableTracker_Split tests TrySplit method for GrowableTracker
+// in regards to returned primary and residual.
+func TestGrowableTracker_Split(t *testing.T) {

Review Comment:
   Consider generalizing this into a table-driven test to check a number of 
restriction sizes and split points. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 774723)
    Time Spent: 2h 20m  (was: 2h 10m)

> Implement Growable Tracker for Go SDK
> -------------------------------------
>
>                 Key: BEAM-14511
>                 URL: https://issues.apache.org/jira/browse/BEAM-14511
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Ritesh Ghorse
>            Assignee: Ritesh Ghorse
>            Priority: P2
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Add a growable tracker for 
> [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60]
>  Restriction in Go SDK. This would be useful for strreaming/unbounded 
> restrictions in SDF.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to