This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bf4f64158cf [BEAM-14511] Growable Tracker for Go SDK (#17754)
bf4f64158cf is described below

commit bf4f64158cfa79cd3c857d8aeaf8c6c5b755a430
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Wed Jun 1 22:04:35 2022 -0400

    [BEAM-14511] Growable Tracker for Go SDK (#17754)
---
 .../beam/io/rtrackers/offsetrange/offsetrange.go   | 137 +++++++++++-
 .../io/rtrackers/offsetrange/offsetrange_test.go   | 232 ++++++++++++++++++++-
 2 files changed, 358 insertions(+), 11 deletions(-)

diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go 
b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
index 0eef03d6f43..1aa76fe4ced 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
@@ -23,6 +23,7 @@ import (
        "bytes"
        "encoding/binary"
        "errors"
+       "fmt"
        "math"
        "reflect"
 
@@ -33,6 +34,7 @@ import (
 func init() {
        runtime.RegisterType(reflect.TypeOf((*Tracker)(nil)))
        runtime.RegisterType(reflect.TypeOf((*Restriction)(nil)).Elem())
+       runtime.RegisterType(reflect.TypeOf((*GrowableTracker)(nil)))
        runtime.RegisterFunction(restEnc)
        runtime.RegisterFunction(restDec)
        coder.RegisterCoder(reflect.TypeOf((*Restriction)(nil)).Elem(), 
restEnc, restDec)
@@ -122,19 +124,21 @@ func (r Restriction) Size() float64 {
 // no assumptions about the positions of blocks within the range, so users 
must handle validation
 // of block positions if needed.
 type Tracker struct {
-       rest    Restriction
-       claimed int64 // Tracks the last claimed position.
-       stopped bool  // Tracks whether TryClaim has indicated to stop 
processing elements.
-       err     error
+       rest      Restriction
+       claimed   int64 // Tracks the last claimed position.
+       stopped   bool  // Tracks whether TryClaim has indicated to stop 
processing elements.
+       attempted int64 // Tracks the last attempted position to claim.
+       err       error
 }
 
 // NewTracker is a constructor for an Tracker given a start and end range.
 func NewTracker(rest Restriction) *Tracker {
        return &Tracker{
-               rest:    rest,
-               claimed: rest.Start - 1,
-               stopped: false,
-               err:     nil,
+               rest:      rest,
+               claimed:   rest.Start - 1,
+               attempted: -1,
+               stopped:   false,
+               err:       nil,
        }
 }
 
@@ -154,7 +158,7 @@ func (tracker *Tracker) TryClaim(rawPos interface{}) bool {
        }
 
        pos := rawPos.(int64)
-
+       tracker.attempted = pos
        if pos < tracker.rest.Start {
                tracker.stopped = true
                tracker.err = errors.New("position claimed is out of bounds of 
the restriction")
@@ -212,7 +216,7 @@ func (tracker *Tracker) GetProgress() (done, remaining 
float64) {
 
 // IsDone returns true if the most recent claimed element is at or past the 
end of the restriction
 func (tracker *Tracker) IsDone() bool {
-       return tracker.err == nil && (tracker.claimed+1) >= tracker.rest.End
+       return tracker.err == nil && (tracker.claimed+1 >= tracker.rest.End || 
tracker.rest.Start >= tracker.rest.End)
 }
 
 // GetRestriction returns a copy of the tracker's underlying 
offsetrange.Restriction.
@@ -225,3 +229,116 @@ func (tracker *Tracker) GetRestriction() interface{} {
 func (tracker *Tracker) IsBounded() bool {
        return true
 }
+
+// RangeEndEstimator provides the estimated end offset of the range. Users 
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+       // Estimate is called to get the end offset in TrySplit() functions.
+       //
+       // The end offset is exclusive for the range. The estimated end is not 
required to
+       // monotonically increase as it will only be taken into consideration 
when the
+       // estimated end offset is larger than the current position.
+       // Returning math.MaxInt64 as the estimate implies the largest possible 
position for the range
+       // is math.MaxInt64 - 1.
+       //
+       // Providing a good estimate is important for an accurate progress 
signal and will impact
+       // splitting decisions by the runner.
+       Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be 
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that 
this tracker makes
+// no assumptions about the positions of blocks within the range, so users 
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+       Tracker
+       rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker creates a GrowableTracker for handling a growable offset 
range.
+// math.MaxInt64 is used as the end of the range to indicate infinity for an 
unbounded range.
+//
+// An OffsetRange is considered growable when the end offset could grow (or 
change)
+// during execution time (e.g. Kafka topic partition offset, appended file, 
...).
+//
+// The growable range is marked as done by claiming math.MaxInt64-1.
+//
+// For bounded restrictions, this tracker works the same as 
offsetrange.Tracker.
+// Use that directly if you have no need of estimating the end of a bound.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator) 
(*GrowableTracker, error) {
+       if rangeEndEstimator == nil {
+               return nil, fmt.Errorf("param rangeEndEstimator cannot be nil. 
Implementing offsetrange.RangeEndEstimator may be required")
+       }
+       return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End: 
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+       return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+       return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+       if x > y {
+               return x
+       }
+       return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of 
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual 
interface{}, err error) {
+       if tracker.stopped || tracker.IsDone() {
+               return tracker.rest, nil, nil
+       }
+
+       // If current tracking range is no longer growable, split it as a 
normal range.
+       if tracker.End() != math.MaxInt64 {
+               return tracker.Tracker.TrySplit(fraction)
+       }
+
+       // If current range has been done, there is no more space to split.
+       if tracker.attempted == math.MaxInt64 {
+               return nil, nil, nil
+       }
+
+       cur := max(tracker.attempted, tracker.Start()-1)
+       estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+       splitPt := cur + int64(math.Ceil(math.Max(1, 
float64(estimatedEnd-cur)*(fraction))))
+       if splitPt > estimatedEnd {
+               return tracker.rest, nil, nil
+       }
+       residual = Restriction{Start: splitPt, End: tracker.End()}
+       tracker.rest.End = splitPt
+       return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes 
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+       // If current tracking range is no longer growable, get its progress as 
a normal range.
+       if tracker.End() != math.MaxInt64 {
+               return tracker.Tracker.GetProgress()
+       }
+
+       estimatedEnd := tracker.rangeEndEstimator.Estimate()
+
+       if tracker.attempted == -1 {
+               done = 0
+               remaining = math.Max(0, float64(estimatedEnd-tracker.Start()))
+               return done, remaining
+       }
+
+       remaining = math.Max(0, 
float64(estimatedEnd)-float64(tracker.attempted))
+       done = float64((tracker.claimed + 1) - tracker.rest.Start)
+       return done, remaining
+}
+
+// IsBounded checks if the current restriction is bounded or not.
+func (tracker *GrowableTracker) IsBounded() bool {
+       return tracker.End() != math.MaxInt64
+}
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go 
b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
index 4147dec97ac..15fbedb263f 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
@@ -17,8 +17,10 @@ package offsetrange
 
 import (
        "fmt"
-       "github.com/google/go-cmp/cmp"
+       "math"
        "testing"
+
+       "github.com/google/go-cmp/cmp"
 )
 
 // TestRestriction_EvenSplits tests various splits and checks that they all
@@ -368,3 +370,231 @@ func TestTracker_TrySplit_WithoutClaiming(t *testing.T) {
                })
        }
 }
+
+type offsetRangeEndEstimator struct {
+       EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+       return o.EstimateRangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when 
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       _, err := NewGrowableTracker(rest, nil)
+       if err == nil {
+               t.Errorf("NewGrowableTracker() should have failed when nil is 
passed as a paramter for RangeEndEstimator.")
+       }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tests := []struct {
+               claim  int64
+               result bool
+       }{
+               {
+                       claim:  0,
+                       result: true,
+               },
+               {
+                       claim:  10,
+                       result: true,
+               },
+               {
+                       claim:  100,
+                       result: true,
+               },
+               {
+                       claim:  math.MaxInt64,
+                       result: false,
+               },
+               {
+                       claim:  -1,
+                       result: false,
+               },
+       }
+
+       for _, test := range tests {
+               tracker, err := NewGrowableTracker(rest, &estimator)
+               if err != nil {
+                       t.Fatalf("error in creating a new GrowableTracker: %v", 
err)
+               }
+               if got, want := tracker.TryClaim(test.claim), test.result; got 
!= want {
+                       t.Errorf("tracker.TryClaim(%d) = %v, want: %v", 
test.claim, got, want)
+               }
+       }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for 
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+       estimator.EstimateRangeEnd = 10
+       gotP, gotR, err := tracker.TrySplit(0)
+       if err != nil {
+               t.Fatalf("error in tracker.TrySplit(0): %v", err)
+       }
+
+       want := Restriction{0, 0}
+       if got := gotP.(Restriction); got != want {
+               t.Errorf("wrong primaries after TrySplit(0), got: %v, want: 
%v", got, want)
+       }
+       if got := tracker.GetRestriction().(Restriction); got != want {
+               t.Errorf("wrong restriction tracked by tracker after 
TrySplit(0), got: %v, want: %v", got, want)
+       }
+       if got, want := gotR.(Restriction), (Restriction{0, math.MaxInt64}); 
got != want {
+               t.Errorf("wrong residual after TrySplit(0), got: %v, want: %v", 
got, want)
+       }
+}
+
+// TestGrowableTracker_Splits tests that TrySplit follows its contract, 
meaning that
+// splits don't lose any elements, split fractions are clamped to 0 or 1, and
+// that TrySplit always splits at the nearest integer greater than the given
+// fraction.
+func TestGrowableTracker_Splits(t *testing.T) {
+       tests := []struct {
+               rest              Restriction
+               claimed           int64
+               fraction          float64
+               rangeEndEstimator offsetRangeEndEstimator
+               // Index where we want the split to happen. This will be the end
+               // (exclusive) of the primary and first element of the residual.
+               splitPt int64
+       }{
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           0,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           5,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           100,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 100},
+                       splitPt:           101,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 
math.MaxInt64},
+                       claimed:           5,
+                       fraction:          0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 0},
+                       splitPt:           6,
+               },
+               {
+                       rest:              Restriction{Start: 0, End: 10},
+                       claimed:           5,
+                       fraction:          -0.5,
+                       rangeEndEstimator: 
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+                       splitPt:           6,
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(split at %v of [%v, %v])",
+                       test.fraction, test.claimed, test.rest.End), func(t 
*testing.T) {
+                       rt, err := NewGrowableTracker(test.rest, 
&test.rangeEndEstimator)
+                       if err != nil {
+                               t.Fatalf("error in creating a new growable 
tracker: %v", err)
+                       }
+                       ok := rt.TryClaim(test.claimed)
+                       if !ok {
+                               t.Fatalf("tracker failed on rt.TryClaim(%v)", 
test.claimed)
+                       }
+                       gotP, gotR, err := rt.TrySplit(test.fraction)
+                       if err != nil {
+                               t.Fatalf("tracker failed on split 
rt.TrySplit(%v): %v", test.fraction, err)
+                       }
+                       var wantP interface{} = Restriction{Start: 
test.rest.Start, End: test.splitPt}
+                       var wantR interface{} = Restriction{Start: 
test.splitPt, End: test.rest.End}
+                       if test.splitPt == test.rest.End {
+                               wantR = nil // When residuals are empty we 
should get nil.
+                       }
+                       if !cmp.Equal(gotP, wantP) {
+                               t.Errorf("split got incorrect primary: got: %v, 
want: %v", gotP, wantP)
+                       }
+                       if !cmp.Equal(gotR, wantR) {
+                               t.Errorf("split got incorrect residual: got: 
%v, want: %v", gotR, wantR)
+                       }
+               })
+       }
+}
+
+// TestGrowableTracker_IsBounded tests IsBounded method for GrowableTracker.
+func TestGrowableTracker_IsBounded(t *testing.T) {
+       estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+       rest := Restriction{Start: 0, End: math.MaxInt64}
+       tracker, err := NewGrowableTracker(rest, &estimator)
+       if err != nil {
+               t.Fatalf("error creating new GrowableTracker: %v", err)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker is bounded, want unbounded 
initially.")
+       }
+
+       if got, want := tracker.TryClaim(int64(0)), true; got != want {
+               t.Errorf("tracker.TryClaim(0) = %v, want: %v", got, want)
+       }
+
+       if tracker.IsBounded() {
+               t.Errorf("GrowableTracker should've been unbounded.")
+       }
+
+       estimator.EstimateRangeEnd = 16
+       tracker.TrySplit(0.5)
+       if !tracker.IsBounded() {
+               t.Errorf("tracker should've been bounded after split")
+       }
+}
+
+// TestGrowableTracker_Progress tests GetProgess method for GrowableTracker.
+func TestGrowableTracker_Progress(t *testing.T) {
+       estimator := offsetRangeEndEstimator{0}
+       tests := []struct {
+               tracker         GrowableTracker
+               done, remaining float64
+               estimator       int64
+       }{
+               {
+                       tracker:   GrowableTracker{Tracker{rest: 
Restriction{Start: 0, End: math.MaxInt64}, claimed: 20, attempted: 20}, 
&estimator},
+                       done:      21,
+                       remaining: 0,
+                       estimator: 0,
+               },
+               {
+                       tracker:   GrowableTracker{Tracker{rest: 
Restriction{Start: 0, End: 20}, claimed: 15, attempted: 15}, &estimator},
+                       done:      16,
+                       remaining: 4,
+                       estimator: 0,
+               },
+               {
+                       tracker:   GrowableTracker{Tracker{rest: 
Restriction{Start: 0, End: math.MaxInt64}, claimed: 20, attempted: 20}, 
&estimator},
+                       done:      21,
+                       remaining: math.MaxInt64 - 20,
+                       estimator: math.MaxInt64,
+               },
+       }
+
+       for _, test := range tests {
+               estimator.EstimateRangeEnd = test.estimator
+               done, remaining := test.tracker.GetProgress()
+               if got, want := done, test.done; got != want {
+                       t.Errorf("wrong amount of work done, got: %v, want: 
%v", got, want)
+               }
+               if got, want := remaining, test.remaining; got != want {
+                       t.Errorf("wrong amount of work remaining, got:%v, want: 
%v", got, want)
+               }
+       }
+}

Reply via email to