lrakla commented on code in PR #29122:
URL: https://github.com/apache/beam/pull/29122#discussion_r1370940999


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -583,11 +583,12 @@ def _schedule_periodic_shutdown(self):
     # type: () -> None
     def shutdown_inactive_bundle_processors():
       # type: () -> None
-      for descriptor_id, last_access_time in self.last_access_times.items():
-        if (time.time() - last_access_time >
-            DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S):
-          BundleProcessorCache._shutdown_cached_bundle_processors(
-              self.cached_bundle_processors[descriptor_id])
+      with self._lock:

Review Comment:
   @AnandInguva I was not sure how to reproduce it. I can take a look based on 
@tvalentyn comments and try reproducing it locally. This is my first 
contribution to the project so any guidance is appreciated. 
   
   @tvalentyn Do you mean moving the lock definition before 
`self._schedule_periodic_shutdown()` ? i.e. swap lines 441 and 442? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to