This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bf4f64158cf [BEAM-14511] Growable Tracker for Go SDK (#17754)
bf4f64158cf is described below
commit bf4f64158cfa79cd3c857d8aeaf8c6c5b755a430
Author: Ritesh Ghorse <[email protected]>
AuthorDate: Wed Jun 1 22:04:35 2022 -0400
[BEAM-14511] Growable Tracker for Go SDK (#17754)
---
.../beam/io/rtrackers/offsetrange/offsetrange.go | 137 +++++++++++-
.../io/rtrackers/offsetrange/offsetrange_test.go | 232 ++++++++++++++++++++-
2 files changed, 358 insertions(+), 11 deletions(-)
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
index 0eef03d6f43..1aa76fe4ced 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go
@@ -23,6 +23,7 @@ import (
"bytes"
"encoding/binary"
"errors"
+ "fmt"
"math"
"reflect"
@@ -33,6 +34,7 @@ import (
func init() {
runtime.RegisterType(reflect.TypeOf((*Tracker)(nil)))
runtime.RegisterType(reflect.TypeOf((*Restriction)(nil)).Elem())
+ runtime.RegisterType(reflect.TypeOf((*GrowableTracker)(nil)))
runtime.RegisterFunction(restEnc)
runtime.RegisterFunction(restDec)
coder.RegisterCoder(reflect.TypeOf((*Restriction)(nil)).Elem(),
restEnc, restDec)
@@ -122,19 +124,21 @@ func (r Restriction) Size() float64 {
// no assumptions about the positions of blocks within the range, so users
must handle validation
// of block positions if needed.
type Tracker struct {
- rest Restriction
- claimed int64 // Tracks the last claimed position.
- stopped bool // Tracks whether TryClaim has indicated to stop
processing elements.
- err error
+ rest Restriction
+ claimed int64 // Tracks the last claimed position.
+ stopped bool // Tracks whether TryClaim has indicated to stop
processing elements.
+ attempted int64 // Tracks the last attempted position to claim.
+ err error
}
// NewTracker is a constructor for an Tracker given a start and end range.
func NewTracker(rest Restriction) *Tracker {
return &Tracker{
- rest: rest,
- claimed: rest.Start - 1,
- stopped: false,
- err: nil,
+ rest: rest,
+ claimed: rest.Start - 1,
+ attempted: -1,
+ stopped: false,
+ err: nil,
}
}
@@ -154,7 +158,7 @@ func (tracker *Tracker) TryClaim(rawPos interface{}) bool {
}
pos := rawPos.(int64)
-
+ tracker.attempted = pos
if pos < tracker.rest.Start {
tracker.stopped = true
tracker.err = errors.New("position claimed is out of bounds of
the restriction")
@@ -212,7 +216,7 @@ func (tracker *Tracker) GetProgress() (done, remaining
float64) {
// IsDone returns true if the most recent claimed element is at or past the
end of the restriction
func (tracker *Tracker) IsDone() bool {
- return tracker.err == nil && (tracker.claimed+1) >= tracker.rest.End
+ return tracker.err == nil && (tracker.claimed+1 >= tracker.rest.End ||
tracker.rest.Start >= tracker.rest.End)
}
// GetRestriction returns a copy of the tracker's underlying
offsetrange.Restriction.
@@ -225,3 +229,116 @@ func (tracker *Tracker) GetRestriction() interface{} {
func (tracker *Tracker) IsBounded() bool {
return true
}
+
+// RangeEndEstimator provides the estimated end offset of the range. Users
must implement this interface to
+// use the offsetrange.GrowableTracker.
+type RangeEndEstimator interface {
+ // Estimate is called to get the end offset in TrySplit() functions.
+ //
+ // The end offset is exclusive for the range. The estimated end is not
required to
+ // monotonically increase as it will only be taken into consideration
when the
+ // estimated end offset is larger than the current position.
+ // Returning math.MaxInt64 as the estimate implies the largest possible
position for the range
+ // is math.MaxInt64 - 1.
+ //
+ // Providing a good estimate is important for an accurate progress
signal and will impact
+ // splitting decisions by the runner.
+ Estimate() int64
+}
+
+// GrowableTracker tracks a growable offset range restriction that can be
represented as a range of integer values,
+// for example for byte offsets in a file, or indices in an array. Note that
this tracker makes
+// no assumptions about the positions of blocks within the range, so users
must handle validation
+// of block positions if needed.
+type GrowableTracker struct {
+ Tracker
+ rangeEndEstimator RangeEndEstimator
+}
+
+// NewGrowableTracker creates a GrowableTracker for handling a growable offset
range.
+// math.MaxInt64 is used as the end of the range to indicate infinity for an
unbounded range.
+//
+// An OffsetRange is considered growable when the end offset could grow (or
change)
+// during execution time (e.g. Kafka topic partition offset, appended file,
...).
+//
+// The growable range is marked as done by claiming math.MaxInt64-1.
+//
+// For bounded restrictions, this tracker works the same as
offsetrange.Tracker.
+// Use that directly if you have no need of estimating the end of a bound.
+func NewGrowableTracker(rest Restriction, rangeEndEstimator RangeEndEstimator)
(*GrowableTracker, error) {
+ if rangeEndEstimator == nil {
+ return nil, fmt.Errorf("param rangeEndEstimator cannot be nil.
Implementing offsetrange.RangeEndEstimator may be required")
+ }
+ return &GrowableTracker{*NewTracker(Restriction{Start: rest.Start, End:
rest.End}), rangeEndEstimator}, nil
+}
+
+// Start returns the starting range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) Start() int64 {
+ return tracker.GetRestriction().(Restriction).Start
+}
+
+// End returns the end range of the restriction tracked by a tracker.
+func (tracker *GrowableTracker) End() int64 {
+ return tracker.GetRestriction().(Restriction).End
+}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+
+// TrySplit splits at the nearest integer greater than the given fraction of
the remainder. If the
+// fraction given is outside of the [0, 1] range, it is clamped to 0 or 1.
+func (tracker *GrowableTracker) TrySplit(fraction float64) (primary, residual
interface{}, err error) {
+ if tracker.stopped || tracker.IsDone() {
+ return tracker.rest, nil, nil
+ }
+
+ // If current tracking range is no longer growable, split it as a
normal range.
+ if tracker.End() != math.MaxInt64 {
+ return tracker.Tracker.TrySplit(fraction)
+ }
+
+ // If current range has been done, there is no more space to split.
+ if tracker.attempted == math.MaxInt64 {
+ return nil, nil, nil
+ }
+
+ cur := max(tracker.attempted, tracker.Start()-1)
+ estimatedEnd := max(tracker.rangeEndEstimator.Estimate(), cur+1)
+
+ splitPt := cur + int64(math.Ceil(math.Max(1,
float64(estimatedEnd-cur)*(fraction))))
+ if splitPt > estimatedEnd {
+ return tracker.rest, nil, nil
+ }
+ residual = Restriction{Start: splitPt, End: tracker.End()}
+ tracker.rest.End = splitPt
+ return tracker.rest, residual, nil
+}
+
+// GetProgress reports progress based on the claimed size and unclaimed sizes
of the restriction.
+func (tracker *GrowableTracker) GetProgress() (done, remaining float64) {
+ // If current tracking range is no longer growable, get its progress as
a normal range.
+ if tracker.End() != math.MaxInt64 {
+ return tracker.Tracker.GetProgress()
+ }
+
+ estimatedEnd := tracker.rangeEndEstimator.Estimate()
+
+ if tracker.attempted == -1 {
+ done = 0
+ remaining = math.Max(0, float64(estimatedEnd-tracker.Start()))
+ return done, remaining
+ }
+
+ remaining = math.Max(0,
float64(estimatedEnd)-float64(tracker.attempted))
+ done = float64((tracker.claimed + 1) - tracker.rest.Start)
+ return done, remaining
+}
+
+// IsBounded checks if the current restriction is bounded or not.
+func (tracker *GrowableTracker) IsBounded() bool {
+ return tracker.End() != math.MaxInt64
+}
diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
index 4147dec97ac..15fbedb263f 100644
--- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
+++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go
@@ -17,8 +17,10 @@ package offsetrange
import (
"fmt"
- "github.com/google/go-cmp/cmp"
+ "math"
"testing"
+
+ "github.com/google/go-cmp/cmp"
)
// TestRestriction_EvenSplits tests various splits and checks that they all
@@ -368,3 +370,231 @@ func TestTracker_TrySplit_WithoutClaiming(t *testing.T) {
})
}
}
+
+type offsetRangeEndEstimator struct {
+ EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+ return o.EstimateRangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ _, err := NewGrowableTracker(rest, nil)
+ if err == nil {
+ t.Errorf("NewGrowableTracker() should have failed when nil is
passed as a paramter for RangeEndEstimator.")
+ }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tests := []struct {
+ claim int64
+ result bool
+ }{
+ {
+ claim: 0,
+ result: true,
+ },
+ {
+ claim: 10,
+ result: true,
+ },
+ {
+ claim: 100,
+ result: true,
+ },
+ {
+ claim: math.MaxInt64,
+ result: false,
+ },
+ {
+ claim: -1,
+ result: false,
+ },
+ }
+
+ for _, test := range tests {
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error in creating a new GrowableTracker: %v",
err)
+ }
+ if got, want := tracker.TryClaim(test.claim), test.result; got
!= want {
+ t.Errorf("tracker.TryClaim(%d) = %v, want: %v",
test.claim, got, want)
+ }
+ }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+ estimator.EstimateRangeEnd = 10
+ gotP, gotR, err := tracker.TrySplit(0)
+ if err != nil {
+ t.Fatalf("error in tracker.TrySplit(0): %v", err)
+ }
+
+ want := Restriction{0, 0}
+ if got := gotP.(Restriction); got != want {
+ t.Errorf("wrong primaries after TrySplit(0), got: %v, want:
%v", got, want)
+ }
+ if got := tracker.GetRestriction().(Restriction); got != want {
+ t.Errorf("wrong restriction tracked by tracker after
TrySplit(0), got: %v, want: %v", got, want)
+ }
+ if got, want := gotR.(Restriction), (Restriction{0, math.MaxInt64});
got != want {
+ t.Errorf("wrong residual after TrySplit(0), got: %v, want: %v",
got, want)
+ }
+}
+
+// TestGrowableTracker_Splits tests that TrySplit follows its contract,
meaning that
+// splits don't lose any elements, split fractions are clamped to 0 or 1, and
+// that TrySplit always splits at the nearest integer greater than the given
+// fraction.
+func TestGrowableTracker_Splits(t *testing.T) {
+ tests := []struct {
+ rest Restriction
+ claimed int64
+ fraction float64
+ rangeEndEstimator offsetRangeEndEstimator
+ // Index where we want the split to happen. This will be the end
+ // (exclusive) of the primary and first element of the residual.
+ splitPt int64
+ }{
+ {
+ rest: Restriction{Start: 0, End:
math.MaxInt64},
+ claimed: 0,
+ fraction: 0.5,
+ rangeEndEstimator:
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+ splitPt: 5,
+ },
+ {
+ rest: Restriction{Start: 0, End:
math.MaxInt64},
+ claimed: 100,
+ fraction: 0.5,
+ rangeEndEstimator:
offsetRangeEndEstimator{EstimateRangeEnd: 100},
+ splitPt: 101,
+ },
+ {
+ rest: Restriction{Start: 0, End:
math.MaxInt64},
+ claimed: 5,
+ fraction: 0.5,
+ rangeEndEstimator:
offsetRangeEndEstimator{EstimateRangeEnd: 0},
+ splitPt: 6,
+ },
+ {
+ rest: Restriction{Start: 0, End: 10},
+ claimed: 5,
+ fraction: -0.5,
+ rangeEndEstimator:
offsetRangeEndEstimator{EstimateRangeEnd: 10},
+ splitPt: 6,
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(fmt.Sprintf("(split at %v of [%v, %v])",
+ test.fraction, test.claimed, test.rest.End), func(t
*testing.T) {
+ rt, err := NewGrowableTracker(test.rest,
&test.rangeEndEstimator)
+ if err != nil {
+ t.Fatalf("error in creating a new growable
tracker: %v", err)
+ }
+ ok := rt.TryClaim(test.claimed)
+ if !ok {
+ t.Fatalf("tracker failed on rt.TryClaim(%v)",
test.claimed)
+ }
+ gotP, gotR, err := rt.TrySplit(test.fraction)
+ if err != nil {
+ t.Fatalf("tracker failed on split
rt.TrySplit(%v): %v", test.fraction, err)
+ }
+ var wantP interface{} = Restriction{Start:
test.rest.Start, End: test.splitPt}
+ var wantR interface{} = Restriction{Start:
test.splitPt, End: test.rest.End}
+ if test.splitPt == test.rest.End {
+ wantR = nil // When residuals are empty we
should get nil.
+ }
+ if !cmp.Equal(gotP, wantP) {
+ t.Errorf("split got incorrect primary: got: %v,
want: %v", gotP, wantP)
+ }
+ if !cmp.Equal(gotR, wantR) {
+ t.Errorf("split got incorrect residual: got:
%v, want: %v", gotR, wantR)
+ }
+ })
+ }
+}
+
+// TestGrowableTracker_IsBounded tests IsBounded method for GrowableTracker.
+func TestGrowableTracker_IsBounded(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+
+ if tracker.IsBounded() {
+ t.Errorf("GrowableTracker is bounded, want unbounded
initially.")
+ }
+
+ if got, want := tracker.TryClaim(int64(0)), true; got != want {
+ t.Errorf("tracker.TryClaim(0) = %v, want: %v", got, want)
+ }
+
+ if tracker.IsBounded() {
+ t.Errorf("GrowableTracker should've been unbounded.")
+ }
+
+ estimator.EstimateRangeEnd = 16
+ tracker.TrySplit(0.5)
+ if !tracker.IsBounded() {
+ t.Errorf("tracker should've been bounded after split")
+ }
+}
+
+// TestGrowableTracker_Progress tests GetProgess method for GrowableTracker.
+func TestGrowableTracker_Progress(t *testing.T) {
+ estimator := offsetRangeEndEstimator{0}
+ tests := []struct {
+ tracker GrowableTracker
+ done, remaining float64
+ estimator int64
+ }{
+ {
+ tracker: GrowableTracker{Tracker{rest:
Restriction{Start: 0, End: math.MaxInt64}, claimed: 20, attempted: 20},
&estimator},
+ done: 21,
+ remaining: 0,
+ estimator: 0,
+ },
+ {
+ tracker: GrowableTracker{Tracker{rest:
Restriction{Start: 0, End: 20}, claimed: 15, attempted: 15}, &estimator},
+ done: 16,
+ remaining: 4,
+ estimator: 0,
+ },
+ {
+ tracker: GrowableTracker{Tracker{rest:
Restriction{Start: 0, End: math.MaxInt64}, claimed: 20, attempted: 20},
&estimator},
+ done: 21,
+ remaining: math.MaxInt64 - 20,
+ estimator: math.MaxInt64,
+ },
+ }
+
+ for _, test := range tests {
+ estimator.EstimateRangeEnd = test.estimator
+ done, remaining := test.tracker.GetProgress()
+ if got, want := done, test.done; got != want {
+ t.Errorf("wrong amount of work done, got: %v, want:
%v", got, want)
+ }
+ if got, want := remaining, test.remaining; got != want {
+ t.Errorf("wrong amount of work remaining, got:%v, want:
%v", got, want)
+ }
+ }
+}