This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f23c420010 [Prism] Improve logging messages and levels. (#36242)
7f23c420010 is described below

commit 7f23c420010393003fd135ccce42e867c03cf511
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Sep 23 15:36:11 2025 -0400

    [Prism] Improve logging messages and levels. (#36242)
    
    * Improve logging messages. No functional changes.
    
    * Add some more logging messages.
---
 .../prism/internal/engine/elementmanager.go        | 84 ++++++++++++----------
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  7 +-
 .../pkg/beam/runners/prism/internal/preprocess.go  | 14 ++++
 sdks/go/pkg/beam/runners/prism/internal/stage.go   |  4 +-
 .../beam/runners/prism/internal/worker/worker.go   |  4 +-
 5 files changed, 66 insertions(+), 47 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 0ef7ed4ea44..6af030f3622 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -260,7 +260,7 @@ func NewElementManager(config Config) *ElementManager {
 // AddStage adds a stage to this element manager, connecting it's PCollections 
and
 // nodes to the watermark propagation graph.
 func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, 
sides []LinkID) {
-       slog.Debug("AddStage", slog.String("ID", ID), slog.Any("inputs", 
inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
+       slog.Debug("em.AddStage", slog.String("ID", ID), slog.Any("inputs", 
inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
        ss := makeStageState(ID, inputIDs, outputIDs, sides)
 
        em.stages[ss.ID] = ss
@@ -504,6 +504,40 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
        return runStageCh
 }
 
+// DumpStages puts all the stage information into a string and returns it.
+func (em *ElementManager) DumpStages() string {
+       var stageState []string
+       ids := maps.Keys(em.stages)
+       if em.testStreamHandler != nil {
+               stageState = append(stageState, fmt.Sprintf("TestStreamHandler: 
completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v 
\n",
+                       em.testStreamHandler.completed, 
em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), 
em.testStreamHandler.events, em.testStreamHandler.processingTime, 
mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
+       } else {
+               stageState = append(stageState, fmt.Sprintf("ElementManager 
Now: %v processingTimeEvents: %v injectedBundles: %v\n", 
em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
+       }
+       sort.Strings(ids)
+       for _, id := range ids {
+               ss := em.stages[id]
+               inW := ss.InputWatermark()
+               outW := ss.OutputWatermark()
+               upPCol, upW := ss.UpstreamWatermark()
+               upS := em.pcolParents[upPCol]
+               if upS == "" {
+                       upS = "IMPULSE  " // (extra spaces to allow print to 
align better.)
+               }
+               stageState = append(stageState, fmt.Sprintln(id, "watermark 
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, 
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", 
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, 
"pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject", 
ss.bundlesToInject))
+
+               var outputConsumers, sideConsumers []string
+               for _, col := range ss.outputIDs {
+                       outputConsumers = append(outputConsumers, 
em.consumers[col]...)
+                       for _, l := range em.sideConsumers[col] {
+                               sideConsumers = append(sideConsumers, l.Global)
+                       }
+               }
+               stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v 
outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides, 
ss.outputIDs, outputConsumers, sideConsumers))
+       }
+       return strings.Join(stageState, "")
+}
+
 // checkForQuiescence sees if this element manager is no longer able to do any 
pending work or make progress.
 //
 // Quiescense can happen if there are no inprogress bundles, and there are no 
further watermark refreshes, which
@@ -524,9 +558,9 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) error {
                // If there are changed stages that need a watermarks refresh,
                // we aren't yet stuck.
                v := em.livePending.Load()
-               slog.Debug("Bundles: nothing in progress after advance",
-                       slog.Any("advanced", advanced),
-                       slog.Int("changeCount", len(em.changedStages)),
+               slog.Debug("Bundles: nothing in progress after advance, but 
some stages need a watermark refresh",
+                       slog.Any("mayProgress", advanced),
+                       slog.Any("needRefresh", em.changedStages),
                        slog.Int64("pendingElementCount", v),
                )
                return nil
@@ -569,36 +603,7 @@ func (em *ElementManager) checkForQuiescence(advanced 
set[string]) error {
        // Jobs must never get stuck so this indicates a bug in prism to be 
investigated.
 
        slog.Debug("Bundles: nothing in progress and no refreshes", 
slog.Int64("pendingElementCount", v))
-       var stageState []string
-       ids := maps.Keys(em.stages)
-       if em.testStreamHandler != nil {
-               stageState = append(stageState, fmt.Sprintf("TestStreamHandler: 
completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v 
\n",
-                       em.testStreamHandler.completed, 
em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), 
em.testStreamHandler.events, em.testStreamHandler.processingTime, 
mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
-       } else {
-               stageState = append(stageState, fmt.Sprintf("ElementManager 
Now: %v processingTimeEvents: %v injectedBundles: %v\n", 
em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
-       }
-       sort.Strings(ids)
-       for _, id := range ids {
-               ss := em.stages[id]
-               inW := ss.InputWatermark()
-               outW := ss.OutputWatermark()
-               upPCol, upW := ss.UpstreamWatermark()
-               upS := em.pcolParents[upPCol]
-               if upS == "" {
-                       upS = "IMPULSE  " // (extra spaces to allow print to 
align better.)
-               }
-               stageState = append(stageState, fmt.Sprintln(id, "watermark 
in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, 
"byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", 
ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", 
ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, 
"pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject", 
ss.bundlesToInject))
-
-               var outputConsumers, sideConsumers []string
-               for _, col := range ss.outputIDs {
-                       outputConsumers = append(outputConsumers, 
em.consumers[col]...)
-                       for _, l := range em.sideConsumers[col] {
-                               sideConsumers = append(sideConsumers, l.Global)
-                       }
-               }
-               stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v 
outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides, 
ss.outputIDs, outputConsumers, sideConsumers))
-       }
-       return errors.Errorf("nothing in progress and no refreshes with non 
zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
+       return errors.Errorf("nothing in progress and no refreshes with non 
zero pending elements: %v\n%v", v, em.DumpStages())
 }
 
 // InputForBundle returns pre-allocated data for the given bundle, encoding 
the elements using
@@ -864,7 +869,9 @@ func (em *ElementManager) PersistBundle(rb RunBundle, 
col2Coders map[string]PCol
                }
                consumers := em.consumers[output]
                sideConsumers := em.sideConsumers[output]
-               slog.Debug("PersistBundle: bundle has downstream consumers.", 
"bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers, 
"sideConsumers", sideConsumers)
+               slog.Debug("PersistBundle: bundle has downstream consumers.", 
"bundle", rb,
+                       slog.Int("newPending", len(newPending)), "consumers", 
consumers, "sideConsumers", sideConsumers,
+                       "pendingDelta", len(newPending)*len(consumers))
                for _, sID := range consumers {
                        consumer := em.stages[sID]
                        count := consumer.AddPending(em, newPending)
@@ -1576,6 +1583,7 @@ func (ss *stageState) buildTriggeredBundle(em 
*ElementManager, key string, win t
                nil,
                panesInBundle,
        )
+       slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID", 
rb.BundleID, "size", len(toProcess))
 
        ss.bundlesToInject = append(ss.bundlesToInject, rb)
        // Bundle is marked in progress here to prevent a race condition.
@@ -1688,6 +1696,7 @@ func (ss *stageState) startEventTimeBundle(watermark 
mtime.Time, genBundID func(
        }
 
        bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, panesInBundle)
+       slog.Debug("started an event time bundle", "stageID", ss.ID, 
"bundleID", bundID, "bundleSize", len(toProcess), "upstreamWatermark", 
watermark)
 
        return bundID, true, stillSchedulable, accumulatingPendingAdjustment
 }
@@ -1987,6 +1996,8 @@ func (ss *stageState) startProcessingTimeBundle(em 
*ElementManager, emNow mtime.
                return "", false, stillSchedulable
        }
        bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, 
holdsInBundle, nil)
+
+       slog.Debug("started a processing time bundle", "stageID", ss.ID, 
"bundleID", bundID, "size", len(toProcess), "emNow", emNow)
        return bundID, true, stillSchedulable
 }
 
@@ -2274,8 +2285,7 @@ func (ss *stageState) bundleReady(em *ElementManager, 
emNow mtime.Time) (mtime.T
                slog.Debug("bundleReady: unchanged upstream watermark",
                        slog.String("stage", ss.ID),
                        slog.Group("watermark",
-                               slog.Any("upstream", upstreamW),
-                               slog.Any("input", inputW)))
+                               slog.Any("upstream == input == previousInput", 
inputW)))
                return mtime.MinTimestamp, false, ptimeEventsReady, 
injectedReady
        }
        ready := true
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index d0daa991fd2..307ebee5664 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -340,7 +340,6 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                                return fmt.Errorf("prism error building stage 
%v: \n%w", stage.ID, err)
                        }
                        stages[stage.ID] = stage
-                       j.Logger.Debug("pipelineBuild", slog.Group("stage", 
slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
                        outputs := maps.Keys(stage.OutputsToCoders)
                        sort.Strings(outputs)
                        em.AddStage(stage.ID, []string{stage.primaryInput}, 
outputs, stage.sideInputs)
@@ -381,11 +380,7 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                case rb, ok := <-bundles:
                        if !ok {
                                err := eg.Wait()
-                               var topoAttrs []any
-                               for _, s := range topo {
-                                       topoAttrs = append(topoAttrs, 
slog.Any(s.ID, s))
-                               }
-                               j.Logger.Debug("pipeline done!", 
slog.String("job", j.String()), slog.Any("error", err), slog.Group("topo", 
topoAttrs...))
+                               j.Logger.Debug("pipeline done!", 
slog.String("job", j.String()), slog.Any("error", err), slog.String("stages", 
em.DumpStages()))
                                return err
                        }
                        eg.Go(func() error {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go 
b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
index 4bf7ba4dff4..3311bcced9f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -182,6 +182,20 @@ func (p *preprocessor) preProcessGraph(comps 
*pipepb.Components, j *jobservices.
                        return nil
                }
        }
+       var stageDetails []any
+       for i, stg := range stages {
+               var transformNames []string
+               for _, tid := range stg.transforms {
+                       transformNames = append(transformNames, 
comps.GetTransforms()[tid].GetUniqueName())
+               }
+               stageDetails = append(stageDetails,
+                       slog.Group(fmt.Sprintf("stage-%03d", i),
+                               slog.String("environment", stg.envID),
+                               slog.Any("transforms", transformNames),
+                       ),
+               )
+       }
+       slog.Debug("preProcessGraph: all stages and transforms", 
stageDetails...)
        return stages
 }
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 101d7a8dc0f..918ea45fcd6 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -174,7 +174,7 @@ func (s *stage) Execute(ctx context.Context, j 
*jobservices.Job, wk *worker.W, c
 
                s.prepareSides(b, rb.Watermark)
 
-               slog.Debug("Execute: processing", "bundle", rb)
+               slog.Debug("Execute: sdk worker transform(s)", "bundle", rb)
                defer b.Cleanup(wk)
                dataReady = b.ProcessOn(ctx, wk)
        default:
@@ -354,7 +354,7 @@ progress:
                        slog.Error("SDK Error from bundle finalization", 
"bundle", rb, "error", err.Error())
                        panic(err)
                }
-               slog.Info("finalized bundle", "bundle", rb)
+               slog.Debug("finalized bundle", "bundle", rb)
        }
        b.OutputData = engine.TentativeData{} // Clear the data.
        return nil
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index c962aa4bff6..5668449f6c9 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -386,7 +386,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
                        for _, d := range resp.GetData() {
                                cr, ok := 
wk.activeInstructions[d.GetInstructionId()]
                                if !ok {
-                                       slog.Info("data.Recv data for unknown 
bundle", "response", resp)
+                                       slog.Debug("data.Recv data for unknown 
bundle", "response", resp)
                                        continue
                                }
                                // Received data is always for an active 
ProcessBundle instruction
@@ -405,7 +405,7 @@ func (wk *W) Data(data fnpb.BeamFnData_DataServer) error {
                        for _, t := range resp.GetTimers() {
                                cr, ok := 
wk.activeInstructions[t.GetInstructionId()]
                                if !ok {
-                                       slog.Info("data.Recv timers for unknown 
bundle", "response", resp)
+                                       slog.Debug("data.Recv timers for 
unknown bundle", "response", resp)
                                        continue
                                }
                                // Received data is always for an active 
ProcessBundle instruction

Reply via email to