damccorm commented on code in PR #28161:
URL: https://github.com/apache/beam/pull/28161#discussion_r1309132628


##########
sdks/python/apache_beam/ml/inference/base.py:
##########
@@ -1108,25 +1292,36 @@ def _run_inference(self, batch, inference_args):
     return predictions
 
   def process(
-      self, batch, inference_args, si_model_metadata: Optional[ModelMetadata]):
+      self,
+      batch,
+      inference_args,
+      si_model_metadata: Optional[Union[ModelMetadata,
+                                        List[ModelMetadata],
+                                        List[KeyModelPathMapping]]]):
     """
     When side input is enabled:
       The method checks if the side input model has been updated, and if so,
       updates the model and runs inference on the batch of data. If the
       side input is empty or the model has not been updated, the method
       simply runs inference on the batch of data.
     """
-    if si_model_metadata:
-      if isinstance(si_model_metadata, beam.pvalue.EmptySideInput):
-        self.update_model(side_input_model_path=None)
-        return self._run_inference(batch, inference_args)
-      elif self._side_input_path != si_model_metadata.model_id:
-        self._side_input_path = si_model_metadata.model_id
-        self._metrics_collector = self.get_metrics_collector(
-            prefix=si_model_metadata.model_name)
-        with threading.Lock():
-          self.update_model(si_model_metadata.model_id)
-          return self._run_inference(batch, inference_args)
+    if not si_model_metadata:
+      return self._run_inference(batch, inference_args)
+
+    if isinstance(si_model_metadata, beam.pvalue.EmptySideInput):
+      self.update_model(side_input_model_path=None)
+    elif isinstance(si_model_metadata, List) and hasattr(si_model_metadata[0],
+                                                         'keys'):
+      # TODO(https://github.com/apache/beam/issues/27628): Update metrics here
+      with threading.Lock():
+        self.update_model(si_model_metadata)

Review Comment:
   I think we don't need the lock at all for this version of update since the 
single multi_process_shared object acts as a lock, and we'll be retaining 
multiple different models at once anyways (the ModelManager already handles all 
this logic today and guarantees a model won't be released until no consumer is 
trying to use it). We should retain the previous locking behavior for other 
cases. Good catch, updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to