[
https://issues.apache.org/jira/browse/BEAM-14511?focusedWorklogId=775069&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-775069
]
ASF GitHub Bot logged work on BEAM-14511:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 26/May/22 14:12
Start Date: 26/May/22 14:12
Worklog Time Spent: 10m
Work Description: riteshghorse commented on code in PR #17754:
URL: https://github.com/apache/beam/pull/17754#discussion_r882714391
##########
sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go:
##########
@@ -299,3 +301,225 @@ func TestTracker_TrySplit(t *testing.T) {
})
}
}
+
+type offsetRangeEndEstimator struct {
+ EstimateRangeEnd int64
+}
+
+// Estimate provides the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) Estimate() int64 {
+ return o.EstimateRangeEnd
+}
+
+// SetEstimateRangeEnd sets the estimated end for unbounded offset range.
+func (o *offsetRangeEndEstimator) SetEstimateRangeEnd(rangeEnd int64) {
+ o.EstimateRangeEnd = rangeEnd
+}
+
+// TestNewGrowableTracker_Bad tests the behavior of NewGrowableTracker when
wrong arguments are passed.
+func TestNewGrowableTracker_Bad(t *testing.T) {
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ _, err := NewGrowableTracker(rest, nil)
+ if err == nil {
+ t.Errorf("NewGrowableTracker() expected to throw error.")
+ }
+}
+
+// TestGrowableTracker_TryClaim tests the TryClaim method for GrowableTracker.
+func TestGrowableTracker_TryClaim(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+
+ if !tracker.TryClaim(int64(10)) {
+ t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+ }
+ if !tracker.TryClaim(int64(100)) {
+ t.Errorf("tracker.TryClaim(10) = %v, want: %v", false, true)
+ }
+ if tracker.TryClaim(int64(math.MaxInt64)) {
+ t.Errorf("tracker.TryClaim(math.MaxInt64) = %v, want: %v, %v",
true, false, tracker.err)
+ }
+ if !tracker.IsDone() {
+ t.Errorf("tracker has done all work, but IsDone() returns
false")
+ }
+}
+
+// TestGrowableTracker_SplitBeforeStart tests TrySplit() method for
GrowableTracker
+// before claiming anything.
+func TestGrowableTracker_SplitBeforeStart(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+ estimator.SetEstimateRangeEnd(10)
+ p, r, _ := tracker.TrySplit(0)
+
+ expected := Restriction{0, 0}
+ if p.(Restriction) != expected {
+ t.Errorf("wrong primaries after TrySplit(0), got: %v, want:
%v", p.(Restriction), expected)
+ }
+ if tracker.GetRestriction().(Restriction) != expected {
+ t.Errorf("wrong restriction tracked by tracker after
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction),
expected)
+ }
+ res := Restriction{0, math.MaxInt64}
+ if res != r.(Restriction) {
+ t.Errorf("wrong residual TrySplit(0), got: %v, want: %v",
r.(Restriction), expected)
+ }
+}
+
+// TestGrowableTracker_CheckpointJustStarted tests TryClaim and TrySplit
+// for GrowableTracker.
+func TestGrowableTracker_CheckpointJustStarted(t *testing.T) {
+ estimator := offsetRangeEndEstimator{EstimateRangeEnd: 0}
+ rest := Restriction{Start: 0, End: math.MaxInt64}
+ tracker, err := NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+ if !tracker.TryClaim(int64(5)) {
+ t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+ }
+ estimator.SetEstimateRangeEnd(0)
+ p, r, _ := tracker.TrySplit(0)
+ if tracker.IsDone() {
+ t.Fatal("tracker not done yet, , but IsDone() returns true")
+ }
+
+ expPr := Restriction{0, 6}
+ if p.(Restriction) != expPr {
+ t.Errorf("wrong primaries after TrySplit(0), got: %v, want:
%v", p.(Restriction), expPr)
+ }
+ if tracker.GetRestriction().(Restriction) != expPr {
+ t.Errorf("wrong restriction tracked by tracker after
TrySplit(0), got: %v, want: %v", tracker.GetRestriction().(Restriction), expPr)
+ }
+ expRes := Restriction{6, math.MaxInt64}
+ if r.(Restriction) != expRes {
+ t.Errorf("wrong residual TrySplit(0), got: %v, want: %v",
r.(Restriction), expRes)
+ }
+
+ tracker, err = NewGrowableTracker(rest, &estimator)
+ if err != nil {
+ t.Fatalf("error creating new GrowableTracker: %v", err)
+ }
+ if !tracker.TryClaim(int64(5)) {
+ t.Fatal("tracker.TryClaim(int64(5)) should've claimed.")
+ }
+ estimator.SetEstimateRangeEnd(20)
+ p, r, _ = tracker.TrySplit(0)
+ if tracker.IsDone() {
+ t.Fatal("tracker not done yet, , but IsDone() returns true")
+ }
+ if p.(Restriction) != expPr {
+ t.Errorf("wrong primaries after TrySplit(0), got: %v, want:
%v", p.(Restriction), expPr)
+ }
+ if r.(Restriction) != expRes {
+ t.Errorf("wrong residual TrySplit(0), got: %v, want: %v",
r.(Restriction), expRes)
+ }
+}
+
+// TestGrowableTracker_Split tests TrySplit method for GrowableTracker
+// in regards to returned primary and residual.
+func TestGrowableTracker_Split(t *testing.T) {
Review Comment:
Done
Issue Time Tracking
-------------------
Worklog Id: (was: 775069)
Time Spent: 4h 10m (was: 4h)
> Implement Growable Tracker for Go SDK
> -------------------------------------
>
> Key: BEAM-14511
> URL: https://issues.apache.org/jira/browse/BEAM-14511
> Project: Beam
> Issue Type: Improvement
> Components: sdk-go
> Reporter: Ritesh Ghorse
> Assignee: Ritesh Ghorse
> Priority: P2
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
> Add a growable tracker for
> [OffsetRange|https://github.com/apache/beam/blob/3e683606d9a03e7da3d37a83eb16c3a6b96068cd/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go#L60]
> Restriction in Go SDK. This would be useful for strreaming/unbounded
> restrictions in SDF.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)