lostluck commented on code in PR #36538:
URL: https://github.com/apache/beam/pull/36538#discussion_r2476050933
##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go:
##########
@@ -98,7 +124,7 @@ func (s *Server) Prepare(ctx context.Context, req
*jobpb.PrepareJobRequest) (_ *
cancelFn(err)
terminalOnceWrap()
},
- Logger: s.logger, // TODO substitute with a
configured logger.
+ Logger: logger,
Review Comment:
Use logger.With here so we can get a Job name or entry automatically
throughout everything started for the job. We can then clean up all the manual
applications of this.
logger.With(slog.String("job", j.String()))
See https://pkg.go.dev/log/slog#Logger.With
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -315,7 +318,7 @@ progress:
for _, rr := range resp.GetResidualRoots() {
ba := rr.GetApplication()
if len(ba.GetElement()) == 0 {
- slog.LogAttrs(context.TODO(), slog.LevelError,
"returned empty residual application", slog.Any("bundle", rb))
+ j.Logger.LogAttrs(context.TODO(), slog.LevelError,
"returned empty residual application", slog.Any("bundle", rb))
Review Comment:
Just as an idle comment: You can imagine how in some contexts, building up
the associated logger, so that anything that you *always* want in the given
debugging output (like bundle I'd) gets it's custom start, so it can be
isolated.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -760,6 +789,8 @@ func (mw *MultiplexW) MakeWorker(id, env string) *W {
Descriptors:
make(map[string]*fnpb.ProcessBundleDescriptor),
parentPool: mw,
wg: mw.wg[id],
+
+ Logger: mw.logger,
Review Comment:
`mw.logger.With(slog.String("worker", id))` instead.
Ideally, this would be coming from the Job's logger directly. Basically the
"MutiplexW" is shared server infrastructure, not specific to a Job.
But the generated workers are job specific. It looks like it should be
straightforward to add the logger parameter and pass out down.
https://github.com/shunping/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go#L223
Then the worker logs and the job logs are fully associated, for eventual
querying goodness.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -123,7 +124,7 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan
struct{} {
wk.activeInstructions[b.InstID] = b
wk.mu.Unlock()
- slog.Debug("processing", "bundle", b, "worker", wk)
+ wk.Logger.Debug("processing", "bundle", b, "worker", wk)
Review Comment:
eg. The "worker" bit can be put into the `With` call earlier for generating
this logger, saving us the effort here, and throughout worker/worker.go
##########
sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go:
##########
@@ -224,6 +224,7 @@ func (j *Job) MakeWorker(env string) *worker.W {
wk.EnvPb = j.Pipeline.GetComponents().GetEnvironments()[env]
wk.PipelineOptions = j.PipelineOptions()
wk.JobKey = j.JobKey()
+ wk.Logger = j.Logger
Review Comment:
Instead of the logger directly, we can call `j.Logger.With` to add the
worker I'd prefix to this logs output, so all calls automatically have it
without us needing to manually add it later.
See https://pkg.go.dev/log/slog#Logger.With
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]