This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e2d62462cd6 [Prism] Terminate Job with CancelFn instead of panic
(#31599)
e2d62462cd6 is described below
commit e2d62462cd622db8b008e7e36e2343b57e90d4f1
Author: Damon <[email protected]>
AuthorDate: Fri Jun 14 20:12:47 2024 -0700
[Prism] Terminate Job with CancelFn instead of panic (#31599)
* Refactor elementmanager Bundles with upstream CancelCauseFunc
* Fix minor edits
---
.../prism/internal/engine/elementmanager.go | 23 ++++++++++++----------
.../prism/internal/engine/elementmanager_test.go | 12 +++++++----
sdks/go/pkg/beam/runners/prism/internal/execute.go | 2 +-
3 files changed, 22 insertions(+), 15 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 76c60e810d4..2c4e08bcd09 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -34,6 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
)
@@ -290,7 +291,7 @@ func (rb RunBundle) LogValue() slog.Value {
// Bundles is the core execution loop. It produces a sequences of bundles able
to be executed.
// The returned channel is closed when the context is canceled, or there are
no pending elements
// remaining.
-func (em *ElementManager) Bundles(ctx context.Context, nextBundID func()
string) <-chan RunBundle {
+func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn
context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
runStageCh := make(chan RunBundle)
ctx, cancelFn := context.WithCancelCause(ctx)
go func() {
@@ -384,7 +385,9 @@ func (em *ElementManager) Bundles(ctx context.Context,
nextBundID func() string)
}
}
}
- em.checkForQuiescence(advanced)
+ if err := em.checkForQuiescence(advanced); err != nil {
+ upstreamCancelFn(err)
+ }
}
}()
return runStageCh
@@ -400,11 +403,11 @@ func (em *ElementManager) Bundles(ctx context.Context,
nextBundID func() string)
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
-func (em *ElementManager) checkForQuiescence(advanced set[string]) {
+func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be
watermark refreshes when they terminate.
- return
+ return nil
}
if len(em.watermarkRefreshes) > 0 {
// If there are watermarks to refresh, we aren't yet stuck.
@@ -414,12 +417,12 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) {
slog.Int("refreshCount", len(em.watermarkRefreshes)),
slog.Int64("pendingElementCount", v),
)
- return
+ return nil
}
if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 {
// If there's no test stream involved, and processing time
events exist, then
// it's only a matter of time.
- return
+ return nil
}
// The job has quiesced!
@@ -433,12 +436,12 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) {
// Note: it's a prism bug if test stream never causes a refresh
to occur for a given event.
// It's not correct to move to the next event if no refreshes
would occur.
if len(em.watermarkRefreshes) > 0 {
- return
+ return nil
} else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
// It's impossible to fully control processing time SDK
side handling for processing time
// Runner side, so we specialize refresh handling here
to avoid spuriously getting stuck.
em.watermarkRefreshes.insert(em.testStreamHandler.ID)
- return
+ return nil
}
// If there are no refreshes, then there's no mechanism to
make progress, so it's time to fast fail.
}
@@ -446,7 +449,7 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) {
v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be
terminating successfully.
- return
+ return nil
}
// The job is officially stuck. Fail fast and produce debugging
information.
// Jobs must never get stuck so this indicates a bug in prism to be
investigated.
@@ -469,7 +472,7 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) {
upS := em.pcolParents[upPCol]
stageState = append(stageState, fmt.Sprintln(id, "watermark
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending,
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts",
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle,
"pttEvents", ss.processingTimeTimers.toFire))
}
- panic(fmt.Sprintf("nothing in progress and no refreshes with non zero
pending elements: %v\n%v", v, strings.Join(stageState, "")))
+ return errors.Errorf("nothing in progress and no refreshes with non
zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
}
// InputForBundle returns pre-allocated data for the given bundle, encoding
the elements using
diff --git
a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
index 275dd790d2b..d5904b13fb8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager_test.go
@@ -316,6 +316,7 @@ func TestStageState_updateWatermarks(t *testing.T) {
func TestElementManager(t *testing.T) {
t.Run("impulse", func(t *testing.T) {
+ ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"output"}, nil)
em.AddStage("dofn", []string{"output"}, nil, nil)
@@ -327,7 +328,7 @@ func TestElementManager(t *testing.T) {
}
var i int
- ch := em.Bundles(context.Background(), func() string {
+ ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
return fmt.Sprintf("%v", i)
})
@@ -371,6 +372,7 @@ func TestElementManager(t *testing.T) {
}
t.Run("dofn", func(t *testing.T) {
+ ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
@@ -378,7 +380,7 @@ func TestElementManager(t *testing.T) {
em.Impulse("impulse")
var i int
- ch := em.Bundles(context.Background(), func() string {
+ ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
@@ -422,6 +424,7 @@ func TestElementManager(t *testing.T) {
})
t.Run("side", func(t *testing.T) {
+ ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn1", []string{"input"}, []string{"output"}, nil)
@@ -429,7 +432,7 @@ func TestElementManager(t *testing.T) {
em.Impulse("impulse")
var i int
- ch := em.Bundles(context.Background(), func() string {
+ ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
@@ -473,13 +476,14 @@ func TestElementManager(t *testing.T) {
}
})
t.Run("residual", func(t *testing.T) {
+ ctx, cancelFn := context.WithCancelCause(context.Background())
em := NewElementManager(Config{})
em.AddStage("impulse", nil, []string{"input"}, nil)
em.AddStage("dofn", []string{"input"}, nil, nil)
em.Impulse("impulse")
var i int
- ch := em.Bundles(context.Background(), func() string {
+ ch := em.Bundles(ctx, cancelFn, func() string {
defer func() { i++ }()
t.Log("generating bundle", i)
return fmt.Sprintf("%v", i)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 7276b725a7e..08ab9f687c5 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -335,7 +335,7 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
eg.SetLimit(8)
var instID uint64
- bundles := em.Bundles(egctx, func() string {
+ bundles := em.Bundles(egctx, j.CancelFn, func() string {
return fmt.Sprintf("inst%03d", atomic.AddUint64(&instID, 1))
})
for {