youngoli commented on a change in pull request #16980:
URL: https://github.com/apache/beam/pull/16980#discussion_r820365091
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
}
}
+func TestRegisterCallback(t *testing.T) {
+ bf := bundleFinalizer{
+ callbacks: []bundleFinalizationCallback{},
+ lastValidCallback: time.Now(),
+ }
+ testVar := 0
+ bf.RegisterCallback(500*time.Minute, func() error {
+ testVar += 5
+ return nil
+ })
+ bf.RegisterCallback(2*time.Minute, func() error {
+ testVar = 25
+ return nil
+ })
+ callbackErr := errors.New("Callback error")
+ bf.RegisterCallback(2*time.Minute, func() error {
+ return callbackErr
+ })
+
+ // We can't do exact equality since this relies on real time, we'll
give it a broad range
+ if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) ||
bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+ t.Errorf("RegisterCallback() lastValidCallback set to %v, want
about 500 minutes", bf.lastValidCallback)
+ }
+ if got, want := len(bf.callbacks), 3; got != want {
+ t.Fatalf("RegisterCallback() called twice, got %v callbacks,
want %v", got, want)
Review comment:
Nit: Looks like a lot of these error messages were written with the
wrong number of callbacks in mind (like here says it was called twice when it
was called three times). For the most part though, I think you can avoid
hardcoding that into the error message entirely. Something like "callbacks in
bundleFinalizer does not match number of calls to RegisterCallback(), got %v
callbacks, want %v". Wordier but doesn't need to be changed whenever the test
changes.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
}
}
+func TestRegisterCallback(t *testing.T) {
+ bf := bundleFinalizer{
+ callbacks: []bundleFinalizationCallback{},
+ lastValidCallback: time.Now(),
+ }
+ testVar := 0
+ bf.RegisterCallback(500*time.Minute, func() error {
+ testVar += 5
+ return nil
+ })
+ bf.RegisterCallback(2*time.Minute, func() error {
+ testVar = 25
+ return nil
+ })
+ callbackErr := errors.New("Callback error")
+ bf.RegisterCallback(2*time.Minute, func() error {
+ return callbackErr
+ })
+
+ // We can't do exact equality since this relies on real time, we'll
give it a broad range
+ if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) ||
bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+ t.Errorf("RegisterCallback() lastValidCallback set to %v, want
about 500 minutes", bf.lastValidCallback)
+ }
+ if got, want := len(bf.callbacks), 3; got != want {
+ t.Fatalf("RegisterCallback() called twice, got %v callbacks,
want %v", got, want)
+ }
+
+ if err := bf.callbacks[0].callback(); err != nil {
+ t.Errorf("RegisterCallback() first callback returned error %v,
want nil", err)
+ }
+ if got, want := testVar, 5; got != want {
+ t.Errorf("RegisterCallback() first callback set testvar to %v,
want %v", got, want)
+ }
+ if err := bf.callbacks[1].callback(); err != nil {
+ t.Errorf("RegisterCallback() second callback returned error %v,
want nil", err)
+ }
+ if got, want := testVar, 25; got != want {
+ t.Errorf("RegisterCallback() second callback set testvar to %v,
want %v", got, want)
+ }
+ if err := bf.callbacks[2].callback(); err != callbackErr {
+ t.Errorf("RegisterCallback() second callback returned error %v,
want %v", err, callbackErr)
Review comment:
Nit: This is actually the third callback. Although it might be easier to
add the index to the format string instead.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string,
manager DataContext) erro
return nil
}
+func (p *Plan) Finalize(ctx context.Context, id string) error {
Review comment:
Remember to document any exported functions, we get linter errors
otherwise.
##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string,
manager DataContext) erro
return nil
}
+func (p *Plan) Finalize(ctx context.Context, id string) error {
+ if p.status != Up {
+ return errors.Errorf("invalid status for plan %v: %v", p.id,
p.status)
+ }
+ failedIndices := []int{}
+ for idx, bfc := range p.bf.callbacks {
+ if time.Now().Before(bfc.validUntil) {
+ if err := bfc.callback(); err != nil {
+ failedIndices = append(failedIndices, idx)
+ }
+ }
+ }
+
+ newFinalizer := bundleFinalizer{
+ callbacks: []bundleFinalizationCallback{},
+ lastValidCallback: time.Now(),
+ }
+
+ for _, idx := range failedIndices {
+ newFinalizer.callbacks = append(newFinalizer.callbacks,
p.bf.callbacks[idx])
+ if
newFinalizer.lastValidCallback.Before(p.bf.callbacks[idx].validUntil) {
+ newFinalizer.lastValidCallback =
p.bf.callbacks[idx].validUntil
+ }
+ }
+
+ if len(failedIndices) > 0 {
+ return errors.Errorf("Plan %v failed %v callbacks", p.ID(),
len(failedIndices))
+ }
+ return nil
+}
+
+func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time {
Review comment:
The id parameter isn't being used, can we remove it?
##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string,
manager DataContext) erro
return nil
}
+func (p *Plan) Finalize(ctx context.Context, id string) error {
+ if p.status != Up {
+ return errors.Errorf("invalid status for plan %v: %v", p.id,
p.status)
+ }
+ failedIndices := []int{}
+ for idx, bfc := range p.bf.callbacks {
+ if time.Now().Before(bfc.validUntil) {
+ if err := bfc.callback(); err != nil {
+ failedIndices = append(failedIndices, idx)
+ }
+ }
+ }
+
+ newFinalizer := bundleFinalizer{
Review comment:
Not sure if I missed something, but it looks like newFinalizer doesn't
actually do anything. It gets populated with the failed callbacks, but then
this function just returns without actually using it or storing it anywhere.
##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -506,6 +563,12 @@ func (c *control) handleInstruction(ctx context.Context,
req *fnpb.InstructionRe
func (c *control) getPlanOrResponse(ctx context.Context, kind string, instID,
ref instructionID) (*exec.Plan, *metrics.Store, *fnpb.InstructionResponse) {
c.mu.Lock()
plan, ok := c.active[ref]
+ if !ok {
+ awaitingFinalization, ok := c.awaitingFinalization[ref]
Review comment:
I'm not 100% sure, but I vaguely recall having an issue where using :=
in an inner scope would create a new variable of the same name as an outer
scope variable. In this case, this inner ok would be a separate variable than
the outer one, so like like lower down in line 579, the outer ok is still false
and it would return a failure.
Regardless of whether this is intended behavior or not, I think you should
rename this for clarity (something like ok2 is fine).
##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
}
}
+func TestRegisterCallback(t *testing.T) {
+ bf := bundleFinalizer{
+ callbacks: []bundleFinalizationCallback{},
+ lastValidCallback: time.Now(),
+ }
+ testVar := 0
+ bf.RegisterCallback(500*time.Minute, func() error {
+ testVar += 5
+ return nil
+ })
+ bf.RegisterCallback(2*time.Minute, func() error {
+ testVar = 25
+ return nil
+ })
+ callbackErr := errors.New("Callback error")
+ bf.RegisterCallback(2*time.Minute, func() error {
+ return callbackErr
+ })
+
+ // We can't do exact equality since this relies on real time, we'll
give it a broad range
+ if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) ||
bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+ t.Errorf("RegisterCallback() lastValidCallback set to %v, want
about 500 minutes", bf.lastValidCallback)
+ }
+ if got, want := len(bf.callbacks), 3; got != want {
+ t.Fatalf("RegisterCallback() called twice, got %v callbacks,
want %v", got, want)
+ }
+
+ if err := bf.callbacks[0].callback(); err != nil {
+ t.Errorf("RegisterCallback() first callback returned error %v,
want nil", err)
Review comment:
Errors should usually be wrapped at the end of the string, separated by
a colon, whenever possible. For example: "RegisterCallback() first callback
returned unexpected error: %v".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]