This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e2d62462cd6 [Prism] Terminate Job with CancelFn instead of panic 
(#31599)
e2d62462cd6 is described below

commit e2d62462cd622db8b008e7e36e2343b57e90d4f1
Author: Damon <[email protected]>
AuthorDate: Fri Jun 14 20:12:47 2024 -0700

    [Prism] Terminate Job with CancelFn instead of panic (#31599)
    
    * Refactor elementmanager Bundles with upstream CancelCauseFunc
    
    * Fix minor edits
---
 .../prism/internal/engine/elementmanager.go        | 23 ++++++++++++----------
 .../prism/internal/engine/elementmanager_test.go   | 12 +++++++----
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  2 +-
 3 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 76c60e810d4..2c4e08bcd09 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -34,6 +34,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "golang.org/x/exp/maps"
        "golang.org/x/exp/slog"
 )
@@ -290,7 +291,7 @@ func (rb RunBundle) LogValue() slog.Value {
 // Bundles is the core execution loop. It produces a sequences of bundles able 
to be executed.
 // The returned channel is closed when the context is canceled, or there are 
no pending elements
 // remaining.
-func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() 
string) <-chan RunBundle {
+func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn 
context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
        runStageCh := make(chan RunBundle)
        ctx, cancelFn := context.WithCancelCause(ctx)
        go func() {
@@ -384,7 +385,9 @@ func (em *ElementManager) Bundles(ctx context.Context, 
nextBundID func() string)
                                        }
                                }
                        }
-                       em.checkForQuiescence(advanced)
+                       if err := em.checkForQuiescence(advanced); err != nil {
+                               upstreamCancelFn(err)
+                       }
                }
        }()
        return runStageCh
@@ -400,11 +403,11 @@ func (em *ElementManager) Bundles(ctx context.Context, 
nextBundID func() string)
 // executing off the next TestStream event.
 //
 // Must be called while holding em.refreshCond.L.
-func (em *ElementManager) checkForQuiescence(advanced set[string]) {
+func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
        defer em.refreshCond.L.Unlock()
        if len(em.inprogressBundles) > 0 {
                // If there are bundles in progress, then there may be 
watermark refreshes when they terminate.
-               return
+               return nil
        }
        if len(em.watermarkRefreshes) > 0 {
                // If there are watermarks to refresh, we aren't yet stuck.
@@ -414,12 +417,12 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) {
                        slog.Int("refreshCount", len(em.watermarkRefreshes)),
                        slog.Int64("pendingElementCount", v),
                )
-               return
+               return nil
        }
        if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 {
                // If there's no test stream involved, and processing time 
events exist, then
                // it's only a matter of time.
-               return
+               return nil
        }
        // The job has quiesced!
 
@@ -433,12 +436,12 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) {
                // Note: it's a prism bug if test stream never causes a refresh 
to occur for a given event.
                // It's not correct to move to the next event if no refreshes 
would occur.
                if len(em.watermarkRefreshes) > 0 {
-                       return
+                       return nil
                } else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
                        // It's impossible to fully control processing time SDK 
side handling for processing time
                        // Runner side, so we specialize refresh handling here 
to avoid spuriously getting stuck.
                        em.watermarkRefreshes.insert(em.testStreamHandler.ID)
-                       return
+                       return nil
                }
                // If there are no refreshes,  then there's no mechanism to 
make progress, so it's time to fast fail.
        }
@@ -446,7 +449,7 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) {
        v := em.livePending.Load()
        if v == 0 {
                // Since there are no further pending elements, the job will be 
terminating successfully.
-               return
+               return nil
        }
        // The job is officially stuck. Fail fast and produce debugging 
information.
        // Jobs must never get stuck so this indicates a bug in prism to be 
investigated.
@@ -469,7 +472,7 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) {
                upS := em.pcolParents[upPCol]
                stageState = append(stageState, fmt.Sprintln(id, "watermark 
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, 
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", 
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, 
"pttEvents", ss.processingTimeTimers.toFire))
        }
-       panic(fmt.Sprintf("nothing in progress and no refreshes with non zero 
pending elements: %v\n%v", v, strings.Join(stageState, "")))
+       return errors.Errorf("nothing in progress and no refreshes with non 
zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
 }
 
 // InputForBundle returns pre-allocated data for the given bundle, encoding 
the elements using
diff --git 
a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
index 275dd790d2b..d5904b13fb8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
@@ -316,6 +316,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
 
 func TestElementManager(t *testing.T) {
        t.Run("impulse", func(t *testing.T) {
+               ctx, cancelFn := context.WithCancelCause(context.Background())
                em := NewElementManager(Config{})
                em.AddStage("impulse", nil, []string{"output"}, nil)
                em.AddStage("dofn", []string{"output"}, nil, nil)
@@ -327,7 +328,7 @@ func TestElementManager(t *testing.T) {
                }
 
                var i int
-               ch := em.Bundles(context.Background(), func() string {
+               ch := em.Bundles(ctx, cancelFn, func() string {
                        defer func() { i++ }()
                        return fmt.Sprintf("%v", i)
                })
@@ -371,6 +372,7 @@ func TestElementManager(t *testing.T) {
        }
 
        t.Run("dofn", func(t *testing.T) {
+               ctx, cancelFn := context.WithCancelCause(context.Background())
                em := NewElementManager(Config{})
                em.AddStage("impulse", nil, []string{"input"}, nil)
                em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
@@ -378,7 +380,7 @@ func TestElementManager(t *testing.T) {
                em.Impulse("impulse")
 
                var i int
-               ch := em.Bundles(context.Background(), func() string {
+               ch := em.Bundles(ctx, cancelFn, func() string {
                        defer func() { i++ }()
                        t.Log("generating bundle", i)
                        return fmt.Sprintf("%v", i)
@@ -422,6 +424,7 @@ func TestElementManager(t *testing.T) {
        })
 
        t.Run("side", func(t *testing.T) {
+               ctx, cancelFn := context.WithCancelCause(context.Background())
                em := NewElementManager(Config{})
                em.AddStage("impulse", nil, []string{"input"}, nil)
                em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
@@ -429,7 +432,7 @@ func TestElementManager(t *testing.T) {
                em.Impulse("impulse")
 
                var i int
-               ch := em.Bundles(context.Background(), func() string {
+               ch := em.Bundles(ctx, cancelFn, func() string {
                        defer func() { i++ }()
                        t.Log("generating bundle", i)
                        return fmt.Sprintf("%v", i)
@@ -473,13 +476,14 @@ func TestElementManager(t *testing.T) {
                }
        })
        t.Run("residual", func(t *testing.T) {
+               ctx, cancelFn := context.WithCancelCause(context.Background())
                em := NewElementManager(Config{})
                em.AddStage("impulse", nil, []string{"input"}, nil)
                em.AddStage("dofn", []string{"input"}, nil, nil)
                em.Impulse("impulse")
 
                var i int
-               ch := em.Bundles(context.Background(), func() string {
+               ch := em.Bundles(ctx, cancelFn, func() string {
                        defer func() { i++ }()
                        t.Log("generating bundle", i)
                        return fmt.Sprintf("%v", i)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 7276b725a7e..08ab9f687c5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -335,7 +335,7 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
        eg.SetLimit(8)
 
        var instID uint64
-       bundles := em.Bundles(egctx, func() string {
+       bundles := em.Bundles(egctx, j.CancelFn, func() string {
                return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
        })
        for {

Reply via email to