This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch includeFailures
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5111afe316c03c49a0fe6f90cedf6170f5ba87ce
Author: Robert Burke <[email protected]>
AuthorDate: Wed Jun 28 17:34:59 2023 +0200

    Clean up user failure logic to not panic.
---
 .../beam/runners/prism/internal/execute_test.go    | 14 ++++++++++++
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 12 ++++++++--
 .../pkg/beam/runners/prism/internal/testdofns.go   |  5 +++++
 .../beam/runners/prism/internal/testdofns_test.go  |  1 +
 .../beam/runners/prism/internal/worker/bundle.go   |  7 ++++++
 .../beam/runners/prism/internal/worker/worker.go   | 26 +++++++++++++---------
 6 files changed, 52 insertions(+), 13 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
index 3f353f0626f..2da3972ff10 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -418,6 +418,20 @@ func TestRunner_Metrics(t *testing.T) {
        })
 }
 
+func TestFailure(t *testing.T) {
+       initRunner(t)
+
+       p, s := beam.NewPipelineWithRoot()
+       imp := beam.Impulse(s)
+       beam.ParDo(s, doFnFail, imp)
+       _, err := executeWithT(context.Background(), t, p)
+       if err == nil {
+               t.Fatalf("expected pipeline failure, but got a success")
+       }
+       // Job failure state reason isn't communicated with the state change 
over the API
+       // so we can't check for a reason here.
+}
+
 // TODO: PCollection metrics tests, in particular for element counts, in multi 
transform pipelines
 // There's a doubling bug since we re-use the same pcollection IDs for the 
source & sink, and
 // don't do any re-writing.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index c5a10cae7b4..7dbf8cf87e7 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -75,7 +75,7 @@ func (s *stage) Execute(j *jobservices.Job, wk *worker.W, 
comps *pipepb.Componen
 
                // Do some accounting for the fake bundle.
                b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
-               close(b.Resp) // To
+               close(b.Resp) // To avoid blocking downstream, since we don't 
send on this.
                closed := make(chan struct{})
                close(closed)
                dataReady = closed
@@ -160,7 +160,15 @@ progress:
        // Tentative Data is ready, commit it to the main datastore.
        slog.Debug("Execute: commiting data", "bundle", rb, 
slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", 
maps.Keys(s.OutputsToCoders)))
 
-       resp := <-b.Resp
+       resp, ok := <-b.Resp
+       // Bundle has failed, fail the job.
+       // TODO add retries & clean up this logic. Channels are closed by the 
"runner" transforms.
+       if !ok && b.Error != "" {
+               slog.Error("job failed", "error", b.Error, "bundle", rb, "job", 
j)
+               j.Failed(fmt.Errorf("bundle failed: %v", b.Error))
+               return
+       }
+
        // Tally metrics immeadiately so they're available before
        // pipeline termination.
        unknownIDs := j.ContributeFinalMetrics(resp)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
index 6e7a9d27aee..9f2801b22ff 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns.go
@@ -239,6 +239,11 @@ func dofn1Counter(ctx context.Context, _ []byte, emit 
func(int64)) {
        beam.NewCounter(ns, "count").Inc(ctx, 1)
 }
 
+func doFnFail(ctx context.Context, _ []byte, emit func(int64)) error {
+       beam.NewCounter(ns, "count").Inc(ctx, 1)
+       return fmt.Errorf("doFnFail: failing as intended")
+}
+
 func combineIntSum(a, b int64) int64 {
        return a + b
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
index 5ca4eb9fd4c..8d45c1155ff 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
@@ -47,6 +47,7 @@ func init() {
        register.Function3x0(dofnGBK3)
        register.Function3x0(dofn1Counter)
        register.Function2x0(dofnSink)
+       register.Function3x1(doFnFail)
 
        register.Function2x1(combineIntSum)
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 77f7094a97e..58cc813d710 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -56,6 +56,8 @@ type B struct {
        Resp chan *fnpb.ProcessBundleResponse
 
        SinkToPCollection map[string]string
+
+       Error string // Set on Respond.
 }
 
 // Init initializes the bundle's internal state for waiting on all
@@ -86,6 +88,11 @@ func (b *B) LogValue() slog.Value {
 }
 
 func (b *B) Respond(resp *fnpb.InstructionResponse) {
+       if resp.GetError() != "" {
+               b.Error = resp.GetError()
+               close(b.Resp)
+               return
+       }
        b.Resp <- resp.GetProcessBundle()
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index 3bd82807f21..9767dec068f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -233,19 +233,19 @@ func (wk *W) GetProcessBundleDescriptor(ctx 
context.Context, req *fnpb.GetProces
 //
 // Requests come from the runner, and are sent to the client in the SDK.
 func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
-       done := make(chan bool)
+       done := make(chan struct{})
        go func() {
                for {
                        resp, err := ctrl.Recv()
                        if err == io.EOF {
                                slog.Debug("ctrl.Recv finished; marking done", 
"worker", wk)
-                               done <- true // means stream is finished
+                               done <- struct{}{} // means stream is finished
                                return
                        }
                        if err != nil {
                                switch status.Code(err) {
                                case codes.Canceled:
-                                       done <- true // means stream is finished
+                                       done <- struct{}{} // means stream is 
finished
                                        return
                                default:
                                        slog.Error("ctrl.Recv failed", err, 
"worker", wk)
@@ -258,9 +258,8 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) 
error {
                        if b, ok := 
wk.activeInstructions[resp.GetInstructionId()]; ok {
                                // TODO. Better pipeline error handling.
                                if resp.Error != "" {
-                                       slog.LogAttrs(context.TODO(), 
slog.LevelError, "ctrl.Recv pipeline error",
+                                       slog.LogAttrs(ctrl.Context(), 
slog.LevelError, "ctrl.Recv pipeline error",
                                                slog.String("error", 
resp.GetError()))
-                                       panic(resp.GetError())
                                }
                                b.Respond(resp)
                        } else {
@@ -270,13 +269,18 @@ func (wk *W) Control(ctrl 
fnpb.BeamFnControl_ControlServer) error {
                }
        }()
 
-       for req := range wk.InstReqs {
-               ctrl.Send(req)
+       for {
+               select {
+               case req := <-wk.InstReqs:
+                       ctrl.Send(req)
+               case <-ctrl.Context().Done():
+                       slog.Debug("Control context canceled")
+                       return ctrl.Context().Err()
+               case <-done:
+                       slog.Debug("Control done")
+                       return nil
+               }
        }
-       slog.Debug("ctrl.Send finished waiting on done")
-       <-done
-       slog.Debug("Control done")
-       return nil
 }
 
 // Data relays elements and timer bytes to SDKs and back again, coordinated via

Reply via email to