lostluck commented on code in PR #24808:
URL: https://github.com/apache/beam/pull/24808#discussion_r1059095468
##########
sdks/go/pkg/beam/runners/universal/extworker/extworker.go:
##########
@@ -106,12 +106,16 @@ func (s *Loopback) StopWorker(ctx context.Context, req
*fnpb.StopWorkerRequest)
// Stop terminates the service and stops all workers.
func (s *Loopback) Stop(ctx context.Context) error {
s.mu.Lock()
- defer s.mu.Unlock()
log.Infof(ctx, "stopping Loopback, and %d workers", len(s.workers))
s.workers = map[string]context.CancelFunc{}
s.lis.Close()
s.rootCancel()
+
+ // There can be a deadlock between the StopWorker RPC and GracefulStop
+ // which waits for all RPCs to finish, so it must be outside the
critical section.
+ s.mu.Unlock()
Review Comment:
It does not. And looking at the Stop and GracefulStop implementations, those
already handle closing the listener, so I've removed that unnecessary line.
There was a race during the Stop call if a request for a new Worker started
up came in, would have happily started one, so now I have it return an error in
that case.
I've run the new runner's suite of pipelines 1000 times with both the race
detector on and off (so that over 10k runs of this sequence), and these changes
avoid the occasional flake I was seeing. This along with earlier good
goroutine hygene made them easier to find in the timeout dumps.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]