lostluck commented on code in PR #35991:
URL: https://github.com/apache/beam/pull/35991#discussion_r2307798661


##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -727,6 +732,10 @@ func NewMultiplexW(lis net.Listener, g *grpc.Server, 
logger *slog.Logger) *Multi
 func (mw *MultiplexW) MakeWorker(id, env string) *W {
        mw.mu.Lock()
        defer mw.mu.Unlock()
+       jobId := strings.TrimSuffix(id, "_"+env)

Review Comment:
   The Job.MakeWorker is already adding the env to the id when it's passed 
in.(jobservices/job.go:207), so this is adding it a second time. We can make 
this more concretely be the "job id" and the "env" id, and have the multiplexer 
manage what the final id ends up being.
   
   That's probably why you needed to do this here. But it does mean 
"WaitForCleanup" isn't going to work properly, since there won't be a Waitgroup 
there.



##########
sdks/go/pkg/beam/runners/prism/internal/worker/worker.go:
##########
@@ -801,6 +813,24 @@ func (mw *MultiplexW) delete(w *W) {
        delete(mw.pool, w.ID)
 }
 
+// WaitForCleanUp waits until all resources relevant to the job are cleaned up.
+func (mw *MultiplexW) WaitForCleanUp(id string) {
+       const cleanUpTimeout = 60 * time.Second
+       c := make(chan struct{})
+
+       go func() {
+               defer close(c)
+               mw.wg[id].Wait()

Review Comment:
   I'd check if the wg is nil here, incase the ids are incorrect/missing. Check 
if the value exists in the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to