tvalentyn commented on code in PR #31052:
URL: https://github.com/apache/beam/pull/31052#discussion_r1576791498
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1434,19 +1519,27 @@ def load():
if isinstance(side_input_model_path, str) and side_input_model_path != '':
model_tag = side_input_model_path
if self._model_handler.share_model_across_processes():
- model = multi_process_shared.MultiProcessShared(
- load, tag=model_tag, always_proxy=True).acquire()
+ models = []
+ for i in range(self._model_handler.model_copies()):
+ models.append(
+ multi_process_shared.MultiProcessShared(
Review Comment:
> will side input update iteration result in creating a new multiprocess
shared handle without releasing the previous one
As far as I can tell, we will recreate a new model_wrapper after calling
_RunInferenceDoFn.update_model() from
https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1525
and then we will create a new ModelWrapper with a new SharedHandle(s) in:
https://github.com/apache/beam/blob/58bea3f19e5220c08a900a3005e80473561dabaf/sdks/python/apache_beam/ml/inference/base.py#L1474
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]