AnandInguva commented on code in PR #28161:
URL: https://github.com/apache/beam/pull/28161#discussion_r1309113056
##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1108,25 +1292,36 @@ def _run_inference(self, batch, inference_args):
return predictions
def process(
- self, batch, inference_args, si_model_metadata: Optional[ModelMetadata]):
+ self,
+ batch,
+ inference_args,
+ si_model_metadata: Optional[Union[ModelMetadata,
+ List[ModelMetadata],
+ List[KeyModelPathMapping]]]):
"""
When side input is enabled:
The method checks if the side input model has been updated, and if so,
updates the model and runs inference on the batch of data. If the
side input is empty or the model has not been updated, the method
simply runs inference on the batch of data.
"""
- if si_model_metadata:
- if isinstance(si_model_metadata, beam.pvalue.EmptySideInput):
- self.update_model(side_input_model_path=None)
- return self._run_inference(batch, inference_args)
- elif self._side_input_path != si_model_metadata.model_id:
- self._side_input_path = si_model_metadata.model_id
- self._metrics_collector = self.get_metrics_collector(
- prefix=si_model_metadata.model_name)
- with threading.Lock():
- self.update_model(si_model_metadata.model_id)
- return self._run_inference(batch, inference_args)
+ if not si_model_metadata:
+ return self._run_inference(batch, inference_args)
+
+ if isinstance(si_model_metadata, beam.pvalue.EmptySideInput):
+ self.update_model(side_input_model_path=None)
+ elif isinstance(si_model_metadata, List) and hasattr(si_model_metadata[0],
+ 'keys'):
+ # TODO(https://github.com/apache/beam/issues/27628): Update metrics here
+ with threading.Lock():
+ self.update_model(si_model_metadata)
Review Comment:
During the update phase, we want to update the model and run inference under
the threading.lock(). The scenario in which, if there are any threads that are
working with old models, the update happens once the threads release the lock.
This is sub-optimal right now and we might need to introduce read-write lock
to prevent this case. I haven't encounter an example/have an example that would
recreate this use case.
This change looks like the run_inference runs outside of the lock. is it
intended?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]