This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7f23c420010 [Prism] Improve logging messages and levels. (#36242)
7f23c420010 is described below
commit 7f23c420010393003fd135ccce42e867c03cf511
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Sep 23 15:36:11 2025 -0400
[Prism] Improve logging messages and levels. (#36242)
* Improve logging messages. No functional changes.
* Add some more logging messages.
---
.../prism/internal/engine/elementmanager.go | 84 ++++++++++++----------
sdks/go/pkg/beam/runners/prism/internal/execute.go | 7 +-
.../pkg/beam/runners/prism/internal/preprocess.go | 14 ++++
sdks/go/pkg/beam/runners/prism/internal/stage.go | 4 +-
.../beam/runners/prism/internal/worker/worker.go | 4 +-
5 files changed, 66 insertions(+), 47 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 0ef7ed4ea44..6af030f3622 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -260,7 +260,7 @@ func NewElementManager(config Config) *ElementManager {
// AddStage adds a stage to this element manager, connecting it's PCollections
and
// nodes to the watermark propagation graph.
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string,
sides []LinkID) {
- slog.Debug("AddStage", slog.String("ID", ID), slog.Any("inputs",
inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
+ slog.Debug("em.AddStage", slog.String("ID", ID), slog.Any("inputs",
inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
ss := makeStageState(ID, inputIDs, outputIDs, sides)
em.stages[ss.ID] = ss
@@ -504,6 +504,40 @@ func (em *ElementManager) Bundles(ctx context.Context,
upstreamCancelFn context.
return runStageCh
}
+// DumpStages puts all the stage information into a string and returns it.
+func (em *ElementManager) DumpStages() string {
+ var stageState []string
+ ids := maps.Keys(em.stages)
+ if em.testStreamHandler != nil {
+ stageState = append(stageState, fmt.Sprintf("TestStreamHandler:
completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v
\n",
+ em.testStreamHandler.completed,
em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events),
em.testStreamHandler.events, em.testStreamHandler.processingTime,
mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
+ } else {
+ stageState = append(stageState, fmt.Sprintf("ElementManager
Now: %v processingTimeEvents: %v injectedBundles: %v\n",
em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
+ }
+ sort.Strings(ids)
+ for _, id := range ids {
+ ss := em.stages[id]
+ inW := ss.InputWatermark()
+ outW := ss.OutputWatermark()
+ upPCol, upW := ss.UpstreamWatermark()
+ upS := em.pcolParents[upPCol]
+ if upS == "" {
+ upS = "IMPULSE " // (extra spaces to allow print to
align better.)
+ }
+ stageState = append(stageState, fmt.Sprintln(id, "watermark
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending,
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts",
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle,
"pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject",
ss.bundlesToInject))
+
+ var outputConsumers, sideConsumers []string
+ for _, col := range ss.outputIDs {
+ outputConsumers = append(outputConsumers,
em.consumers[col]...)
+ for _, l := range em.sideConsumers[col] {
+ sideConsumers = append(sideConsumers, l.Global)
+ }
+ }
+ stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v
outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides,
ss.outputIDs, outputConsumers, sideConsumers))
+ }
+ return strings.Join(stageState, "")
+}
+
// checkForQuiescence sees if this element manager is no longer able to do any
pending work or make progress.
//
// Quiescense can happen if there are no inprogress bundles, and there are no
further watermark refreshes, which
@@ -524,9 +558,9 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) error {
// If there are changed stages that need a watermarks refresh,
// we aren't yet stuck.
v := em.livePending.Load()
- slog.Debug("Bundles: nothing in progress after advance",
- slog.Any("advanced", advanced),
- slog.Int("changeCount", len(em.changedStages)),
+ slog.Debug("Bundles: nothing in progress after advance, but
some stages need a watermark refresh",
+ slog.Any("mayProgress", advanced),
+ slog.Any("needRefresh", em.changedStages),
slog.Int64("pendingElementCount", v),
)
return nil
@@ -569,36 +603,7 @@ func (em *ElementManager) checkForQuiescence(advanced
set[string]) error {
// Jobs must never get stuck so this indicates a bug in prism to be
investigated.
slog.Debug("Bundles: nothing in progress and no refreshes",
slog.Int64("pendingElementCount", v))
- var stageState []string
- ids := maps.Keys(em.stages)
- if em.testStreamHandler != nil {
- stageState = append(stageState, fmt.Sprintf("TestStreamHandler:
completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v
\n",
- em.testStreamHandler.completed,
em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events),
em.testStreamHandler.events, em.testStreamHandler.processingTime,
mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
- } else {
- stageState = append(stageState, fmt.Sprintf("ElementManager
Now: %v processingTimeEvents: %v injectedBundles: %v\n",
em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
- }
- sort.Strings(ids)
- for _, id := range ids {
- ss := em.stages[id]
- inW := ss.InputWatermark()
- outW := ss.OutputWatermark()
- upPCol, upW := ss.UpstreamWatermark()
- upS := em.pcolParents[upPCol]
- if upS == "" {
- upS = "IMPULSE " // (extra spaces to allow print to
align better.)
- }
- stageState = append(stageState, fmt.Sprintln(id, "watermark
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending,
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle",
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts",
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle,
"pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject",
ss.bundlesToInject))
-
- var outputConsumers, sideConsumers []string
- for _, col := range ss.outputIDs {
- outputConsumers = append(outputConsumers,
em.consumers[col]...)
- for _, l := range em.sideConsumers[col] {
- sideConsumers = append(sideConsumers, l.Global)
- }
- }
- stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v
outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides,
ss.outputIDs, outputConsumers, sideConsumers))
- }
- return errors.Errorf("nothing in progress and no refreshes with non
zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
+ return errors.Errorf("nothing in progress and no refreshes with non
zero pending elements: %v\n%v", v, em.DumpStages())
}
// InputForBundle returns pre-allocated data for the given bundle, encoding
the elements using
@@ -864,7 +869,9 @@ func (em *ElementManager) PersistBundle(rb RunBundle,
col2Coders map[string]PCol
}
consumers := em.consumers[output]
sideConsumers := em.sideConsumers[output]
- slog.Debug("PersistBundle: bundle has downstream consumers.",
"bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers,
"sideConsumers", sideConsumers)
+ slog.Debug("PersistBundle: bundle has downstream consumers.",
"bundle", rb,
+ slog.Int("newPending", len(newPending)), "consumers",
consumers, "sideConsumers", sideConsumers,
+ "pendingDelta", len(newPending)*len(consumers))
for _, sID := range consumers {
consumer := em.stages[sID]
count := consumer.AddPending(em, newPending)
@@ -1576,6 +1583,7 @@ func (ss *stageState) buildTriggeredBundle(em
*ElementManager, key string, win t
nil,
panesInBundle,
)
+ slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID",
rb.BundleID, "size", len(toProcess))
ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
@@ -1688,6 +1696,7 @@ func (ss *stageState) startEventTimeBundle(watermark
mtime.Time, genBundID func(
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, panesInBundle)
+ slog.Debug("started an event time bundle", "stageID", ss.ID,
"bundleID", bundID, "bundleSize", len(toProcess), "upstreamWatermark",
watermark)
return bundID, true, stillSchedulable, accumulatingPendingAdjustment
}
@@ -1987,6 +1996,8 @@ func (ss *stageState) startProcessingTimeBundle(em
*ElementManager, emNow mtime.
return "", false, stillSchedulable
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys,
holdsInBundle, nil)
+
+ slog.Debug("started a processing time bundle", "stageID", ss.ID,
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
return bundID, true, stillSchedulable
}
@@ -2274,8 +2285,7 @@ func (ss *stageState) bundleReady(em *ElementManager,
emNow mtime.Time) (mtime.T
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
- slog.Any("upstream", upstreamW),
- slog.Any("input", inputW)))
+ slog.Any("upstream == input == previousInput",
inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady,
injectedReady
}
ready := true
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index d0daa991fd2..307ebee5664 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -340,7 +340,6 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
return fmt.Errorf("prism error building stage
%v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
- j.Logger.Debug("pipelineBuild", slog.Group("stage",
slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.primaryInput},
outputs, stage.sideInputs)
@@ -381,11 +380,7 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
case rb, ok := <-bundles:
if !ok {
err := eg.Wait()
- var topoAttrs []any
- for _, s := range topo {
- topoAttrs = append(topoAttrs,
slog.Any(s.ID, s))
- }
- j.Logger.Debug("pipeline done!",
slog.String("job", j.String()), slog.Any("error", err), slog.Group("topo",
topoAttrs...))
+ j.Logger.Debug("pipeline done!",
slog.String("job", j.String()), slog.Any("error", err), slog.String("stages",
em.DumpStages()))
return err
}
eg.Go(func() error {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
index 4bf7ba4dff4..3311bcced9f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -182,6 +182,20 @@ func (p *preprocessor) preProcessGraph(comps
*pipepb.Components, j *jobservices.
return nil
}
}
+ var stageDetails []any
+ for i, stg := range stages {
+ var transformNames []string
+ for _, tid := range stg.transforms {
+ transformNames = append(transformNames,
comps.GetTransforms()[tid].GetUniqueName())
+ }
+ stageDetails = append(stageDetails,
+ slog.Group(fmt.Sprintf("stage-%03d", i),
+ slog.String("environment", stg.envID),
+ slog.Any("transforms", transformNames),
+ ),
+ )
+ }
+ slog.Debug("preProcessGraph: all stages and transforms",
stageDetails...)
return stages
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 101d7a8dc0f..918ea45fcd6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -174,7 +174,7 @@ func (s *stage) Execute(ctx context.Context, j
*jobservices.Job, wk *worker.W, c
s.prepareSides(b, rb.Watermark)
- slog.Debug("Execute: processing", "bundle", rb)
+ slog.Debug("Execute: sdk worker transform(s)", "bundle", rb)
defer b.Cleanup(wk)
dataReady = b.ProcessOn(ctx, wk)
default:
@@ -354,7 +354,7 @@ progress:
slog.Error("SDK Error from bundle finalization",
"bundle", rb, "error", err.Error())
panic(err)
}
- slog.Info("finalized bundle", "bundle", rb)
+ slog.Debug("finalized bundle", "bundle", rb)
}
b.OutputData = engine.TentativeData{} // Clear the data.
return nil
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index c962aa4bff6..5668449f6c9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -386,7 +386,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
for _, d := range resp.GetData() {
cr, ok :=
wk.activeInstructions[d.GetInstructionId()]
if !ok {
- slog.Info("data.Recv data for unknown
bundle", "response", resp)
+ slog.Debug("data.Recv data for unknown
bundle", "response", resp)
continue
}
// Received data is always for an active
ProcessBundle instruction
@@ -405,7 +405,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
for _, t := range resp.GetTimers() {
cr, ok :=
wk.activeInstructions[t.GetInstructionId()]
if !ok {
- slog.Info("data.Recv timers for unknown
bundle", "response", resp)
+ slog.Debug("data.Recv timers for
unknown bundle", "response", resp)
continue
}
// Received data is always for an active
ProcessBundle instruction