tvalentyn commented on code in PR #29122:
URL: https://github.com/apache/beam/pull/29122#discussion_r1370750624
##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -583,11 +583,12 @@ def _schedule_periodic_shutdown(self):
# type: () -> None
def shutdown_inactive_bundle_processors():
# type: () -> None
- for descriptor_id, last_access_time in self.last_access_times.items():
- if (time.time() - last_access_time >
- DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S):
- BundleProcessorCache._shutdown_cached_bundle_processors(
- self.cached_bundle_processors[descriptor_id])
+ with self._lock:
Review Comment:
It is a good idea to test the changes and yes it may be tricky to test this
one.
According to code coverage report on this PR, this codepath might not be
covered in unit tests, but would be triggered in long-running pipelines. To
test locally, it should be possible to adjust a
https://github.com/apache/beam/blob/0f7698143261271cf0fdb527b833e73997f10748/sdks/python/apache_beam/runners/worker/sdk_worker.py#L82
to a lower value, that might help reproducing the original issue.
As for test, we could decrease the value via `unittest.mock.patch` inside a
unit test that exercises an SDK worker. @AnandInguva feel free to TAL and
suggest a test that would be helpful. We could modify this param in an existing
FnAPI runner test scenario and verify that this codepath is triggered.
Also, I would move up the lock definition in.
https://github.com/apache/beam/blob/0f7698143261271cf0fdb527b833e73997f10748/sdks/python/apache_beam/runners/worker/sdk_worker.py#L442
a few lines above before we schedule this thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]