lostluck commented on code in PR #35991: URL: https://github.com/apache/beam/pull/35991#discussion_r2307798661
########## sdks/go/pkg/beam/runners/prism/internal/worker/worker.go: ########## @@ -727,6 +732,10 @@ func NewMultiplexW(lis net.Listener, g *grpc.Server, logger *slog.Logger) *Multi func (mw *MultiplexW) MakeWorker(id, env string) *W { mw.mu.Lock() defer mw.mu.Unlock() + jobId := strings.TrimSuffix(id, "_"+env) Review Comment: The Job.MakeWorker is already adding the env to the id when it's passed in.(jobservices/job.go:207), so this is adding it a second time. We can make this more concretely be the "job id" and the "env" id, and have the multiplexer manage what the final id ends up being. That's probably why you needed to do this here. But it does mean "WaitForCleanup" isn't going to work properly, since there won't be a Waitgroup there. ########## sdks/go/pkg/beam/runners/prism/internal/worker/worker.go: ########## @@ -801,6 +813,24 @@ func (mw *MultiplexW) delete(w *W) { delete(mw.pool, w.ID) } +// WaitForCleanUp waits until all resources relevant to the job are cleaned up. +func (mw *MultiplexW) WaitForCleanUp(id string) { + const cleanUpTimeout = 60 * time.Second + c := make(chan struct{}) + + go func() { + defer close(c) + mw.wg[id].Wait() Review Comment: I'd check if the wg is nil here, incase the ids are incorrect/missing. Check if the value exists in the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org