This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch includeFailures in repository https://gitbox.apache.org/repos/asf/beam.git
commit 5111afe316c03c49a0fe6f90cedf6170f5ba87ce Author: Robert Burke <[email protected]> AuthorDate: Wed Jun 28 17:34:59 2023 +0200 Clean up user failure logic to not panic. --- .../beam/runners/prism/internal/execute_test.go | 14 ++++++++++++ sdks/go/pkg/beam/runners/prism/internal/stage.go | 12 ++++++++-- .../pkg/beam/runners/prism/internal/testdofns.go | 5 +++++ .../beam/runners/prism/internal/testdofns_test.go | 1 + .../beam/runners/prism/internal/worker/bundle.go | 7 ++++++ .../beam/runners/prism/internal/worker/worker.go | 26 +++++++++++++--------- 6 files changed, 52 insertions(+), 13 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go index 3f353f0626f..2da3972ff10 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go @@ -418,6 +418,20 @@ func TestRunner_Metrics(t *testing.T) { }) } +func TestFailure(t *testing.T) { + initRunner(t) + + p, s := beam.NewPipelineWithRoot() + imp := beam.Impulse(s) + beam.ParDo(s, doFnFail, imp) + _, err := executeWithT(context.Background(), t, p) + if err == nil { + t.Fatalf("expected pipeline failure, but got a success") + } + // Job failure state reason isn't communicated with the state change over the API + // so we can't check for a reason here. +} + // TODO: PCollection metrics tests, in particular for element counts, in multi transform pipelines // There's a doubling bug since we re-use the same pcollection IDs for the source & sink, and // don't do any re-writing. diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index c5a10cae7b4..7dbf8cf87e7 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -75,7 +75,7 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, comps *pipepb.Componen // Do some accounting for the fake bundle. b.Resp = make(chan *fnpb.ProcessBundleResponse, 1) - close(b.Resp) // To + close(b.Resp) // To avoid blocking downstream, since we don't send on this. closed := make(chan struct{}) close(closed) dataReady = closed @@ -160,7 +160,15 @@ progress: // Tentative Data is ready, commit it to the main datastore. slog.Debug("Execute: commiting data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders))) - resp := <-b.Resp + resp, ok := <-b.Resp + // Bundle has failed, fail the job. + // TODO add retries & clean up this logic. Channels are closed by the "runner" transforms. + if !ok && b.Error != "" { + slog.Error("job failed", "error", b.Error, "bundle", rb, "job", j) + j.Failed(fmt.Errorf("bundle failed: %v", b.Error)) + return + } + // Tally metrics immeadiately so they're available before // pipeline termination. unknownIDs := j.ContributeFinalMetrics(resp) diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go index 6e7a9d27aee..9f2801b22ff 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go @@ -239,6 +239,11 @@ func dofn1Counter(ctx context.Context, _ []byte, emit func(int64)) { beam.NewCounter(ns, "count").Inc(ctx, 1) } +func doFnFail(ctx context.Context, _ []byte, emit func(int64)) error { + beam.NewCounter(ns, "count").Inc(ctx, 1) + return fmt.Errorf("doFnFail: failing as intended") +} + func combineIntSum(a, b int64) int64 { return a + b } diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go index 5ca4eb9fd4c..8d45c1155ff 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go @@ -47,6 +47,7 @@ func init() { register.Function3x0(dofnGBK3) register.Function3x0(dofn1Counter) register.Function2x0(dofnSink) + register.Function3x1(doFnFail) register.Function2x1(combineIntSum) diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go index 77f7094a97e..58cc813d710 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go @@ -56,6 +56,8 @@ type B struct { Resp chan *fnpb.ProcessBundleResponse SinkToPCollection map[string]string + + Error string // Set on Respond. } // Init initializes the bundle's internal state for waiting on all @@ -86,6 +88,11 @@ func (b *B) LogValue() slog.Value { } func (b *B) Respond(resp *fnpb.InstructionResponse) { + if resp.GetError() != "" { + b.Error = resp.GetError() + close(b.Resp) + return + } b.Resp <- resp.GetProcessBundle() } diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index 3bd82807f21..9767dec068f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -233,19 +233,19 @@ func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProces // // Requests come from the runner, and are sent to the client in the SDK. func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { - done := make(chan bool) + done := make(chan struct{}) go func() { for { resp, err := ctrl.Recv() if err == io.EOF { slog.Debug("ctrl.Recv finished; marking done", "worker", wk) - done <- true // means stream is finished + done <- struct{}{} // means stream is finished return } if err != nil { switch status.Code(err) { case codes.Canceled: - done <- true // means stream is finished + done <- struct{}{} // means stream is finished return default: slog.Error("ctrl.Recv failed", err, "worker", wk) @@ -258,9 +258,8 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { if b, ok := wk.activeInstructions[resp.GetInstructionId()]; ok { // TODO. Better pipeline error handling. if resp.Error != "" { - slog.LogAttrs(context.TODO(), slog.LevelError, "ctrl.Recv pipeline error", + slog.LogAttrs(ctrl.Context(), slog.LevelError, "ctrl.Recv pipeline error", slog.String("error", resp.GetError())) - panic(resp.GetError()) } b.Respond(resp) } else { @@ -270,13 +269,18 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { } }() - for req := range wk.InstReqs { - ctrl.Send(req) + for { + select { + case req := <-wk.InstReqs: + ctrl.Send(req) + case <-ctrl.Context().Done(): + slog.Debug("Control context canceled") + return ctrl.Context().Err() + case <-done: + slog.Debug("Control done") + return nil + } } - slog.Debug("ctrl.Send finished waiting on done") - <-done - slog.Debug("Control done") - return nil } // Data relays elements and timer bytes to SDKs and back again, coordinated via
