This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ec65d642ccc [Prism] Fix gRPC deadline exceeded errors during bundle 
failure by passing errgroup context (#38472)
ec65d642ccc is described below

commit ec65d642ccc241d992ebfcca684f02864556aa7c
Author: Shunping Huang <[email protected]>
AuthorDate: Tue May 12 15:45:58 2026 -0400

    [Prism] Fix gRPC deadline exceeded errors during bundle failure by passing 
errgroup context (#38472)
    
    * Add go test to reproduce the deadline exceed errors when a dofn fails
    
    * Add python unit test to reproduce it.
    
    * Change the context to egctx so a bundle failure will cancel other bundle 
execution.
    
    * Fix lints.
    
    * Remove unused import.
    
    * Move test to a test class that use built prism during vr test.
    
    * Remove the new python test due to flakiness.
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  6 +++++-
 .../beam/runners/prism/internal/execute_test.go    | 22 ++++++++++++++++++++++
 .../beam/runners/prism/internal/testdofns_test.go  |  5 +++++
 3 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 853b7974479..f6e148f9f3f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -376,7 +376,11 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                        eg.Go(func() error {
                                s := stages[rb.StageID]
                                wk := wks[s.envID]
-                               if err := s.Execute(ctx, j, wk, comps, em, rb); 
err != nil {
+                               // Pass egctx instead of the parent ctx so that 
when any bundle fails,
+                               // the errgroup cancels egctx and all other 
concurrent bundle execution
+                               // goroutines immediately detect cancellation 
and abort. This prevents
+                               // eg.Wait() from blocking indefinitely and 
allows prompt error reporting.
+                               if err := s.Execute(egctx, j, wk, comps, em, 
rb); err != nil {
                                        // Ensure we clean up on bundle failure
                                        j.Logger.Error("Bundle Failed.", 
slog.Any("error", err))
                                        em.FailBundle(rb)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
index 29fccaeb238..2bb73f20e20 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -519,6 +519,28 @@ func TestFailure(t *testing.T) {
        }
 }
 
+func TestFailureHang(t *testing.T) {
+       initRunner(t)
+
+       p, s := beam.NewPipelineWithRoot()
+       imp := beam.Impulse(s)
+       col1 := beam.ParDo(s, doFnBlock, imp)
+       col2 := beam.ParDo(s, doFnFail, imp)
+       beam.ParDo(s, &int64Check{Name: "block", Want: []int{}}, col1)
+       beam.ParDo(s, &int64Check{Name: "fail", Want: []int{}}, col2)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+       defer cancel()
+
+       _, err := executeWithT(ctx, t, p)
+       if err == nil {
+               t.Fatalf("expected pipeline failure, but got a success")
+       }
+       if want := "doFnFail: failing as intended"; 
!strings.Contains(err.Error(), want) {
+               t.Fatalf("expected pipeline failure with %q, but was %v", want, 
err)
+       }
+}
+
 func TestRunner_Passert(t *testing.T) {
        initRunner(t)
        tests := []struct {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
index 334d74fcae1..d21ccd53afd 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
@@ -59,6 +59,7 @@ func init() {
        register.Function3x0(dofn1Counter)
        register.Function2x0(dofnSink)
        register.Function3x1(doFnFail)
+       register.Function3x0(doFnBlock)
 
        register.Function2x1(combineIntSum)
 
@@ -283,6 +284,10 @@ func doFnFail(ctx context.Context, _ []byte, emit 
func(int64)) error {
        return fmt.Errorf("doFnFail: failing as intended")
 }
 
+func doFnBlock(ctx context.Context, _ []byte, emit func(int64)) {
+       <-ctx.Done()
+}
+
 func combineIntSum(a, b int64) int64 {
        return a + b
 }

Reply via email to