[ 
https://issues.apache.org/jira/browse/BEAM-14337?focusedWorklogId=772655&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-772655
 ]

ASF GitHub Bot logged work on BEAM-14337:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/22 03:03
            Start Date: 20/May/22 03:03
    Worklog Time Spent: 10m 
      Work Description: ryanthompson591 commented on code in PR #17470:
URL: https://github.com/apache/beam/pull/17470#discussion_r877676856


##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -39,25 +42,47 @@ class PytorchInferenceRunner(InferenceRunner):
   def __init__(self, device: torch.device):
     self._device = device
 
-  def run_inference(self, batch: List[torch.Tensor],
-                    model: torch.nn.Module) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[torch.Tensor, Dict[str, torch.Tensor]]],
+      model: torch.nn.Module,
+      prediction_params: Optional[Dict[str, Any]] = None,

Review Comment:
   instead of having the if statement below, just set {} as the optional 
parameter.
   
   prediction_params: Optional[Dict[str, Any]] = {}



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -39,25 +42,47 @@ class PytorchInferenceRunner(InferenceRunner):
   def __init__(self, device: torch.device):
     self._device = device
 
-  def run_inference(self, batch: List[torch.Tensor],
-                    model: torch.nn.Module) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[torch.Tensor, Dict[str, torch.Tensor]]],
+      model: torch.nn.Module,
+      prediction_params: Optional[Dict[str, Any]] = None,
+  ) -> Iterable[PredictionResult]:
     """
     Runs inferences on a batch of Tensors and returns an Iterable of
     Tensor Predictions.
 
     This method stacks the list of Tensors in a vectorized format to optimize
     the inference call.
     """
+    if prediction_params is None:
+      prediction_params = {}
 
-    torch_batch = torch.stack(batch)
-    if torch_batch.device != self._device:
-      torch_batch = torch_batch.to(self._device)
-    predictions = model(torch_batch)
+    if isinstance(batch[0], dict):
+      result_dict = defaultdict(list)
+      for el in batch:

Review Comment:
   nit: prefer names like element. element
   
   Looking at this code, it's not very clear why we are doing things this way. 
It's not very clean and pythonic.
   
   I suggest using a named helper method or something that makes it a little 
clearer why things are done this way.



##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -122,6 +137,94 @@ def test_inference_runner_multiple_tensor_features(self):
     for actual, expected in zip(predictions, expected_predictions):
       self.assertTrue(_compare_prediction_result(actual, expected))
 
+  def test_inference_runner_kwargs(self):
+    examples = [
+        {
+            'k1': torch.from_numpy(np.array([1], dtype="float32")),
+            'k2': torch.from_numpy(np.array([1.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([5], dtype="float32")),
+            'k2': torch.from_numpy(np.array([5.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([-3], dtype="float32")),
+            'k2': torch.from_numpy(np.array([-3.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([10.0], dtype="float32")),
+            'k2': torch.from_numpy(np.array([10.5], dtype="float32"))
+        },
+    ]
+    expected_predictions = [
+        PredictionResult(ex, pred) for ex,
+        pred in zip(
+            examples,
+            torch.Tensor([(example['k1'] * 2.0 + 0.5) +
+                          (example['k2'] * 2.0 + 0.5)
+                          for example in examples]).reshape(-1, 1))
+    ]
+
+    class PytorchLinearRegressionMultipleArgs(torch.nn.Module):
+      def __init__(self, input_dim, output_dim):
+        super().__init__()
+        self.linear = torch.nn.Linear(input_dim, output_dim)
+
+      def forward(self, k1, k2):
+        out = self.linear(k1) + self.linear(k2)
+        return out
+
+    model = PytorchLinearRegressionMultipleArgs(input_dim=1, output_dim=1)
+    model.load_state_dict(
+        OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+                     ('linear.bias', torch.Tensor([0.5]))]))
+    model.eval()
+
+    inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+    predictions = inference_runner.run_inference(examples, model)
+    for actual, expected in zip(predictions, expected_predictions):
+      self.assertTrue(_compare_prediction_result(actual, expected))

Review Comment:
   I'm not sure its possible but if it is, ideally we could raise an exception 
with the actual differences between actual and expected.
   
   Instead we're just expecting a function to return true.



##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -59,6 +63,20 @@ def forward(self, x):
     return out
 
 
+class PytorchLinearRegressionPredictionParams(torch.nn.Module):
+  def __init__(self, input_dim, output_dim):
+    super().__init__()
+    self.linear = torch.nn.Linear(input_dim, output_dim)
+
+  def forward(self, k1, prediction_param_array, prediction_param_bool):

Review Comment:
   can you document this method?



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -39,25 +42,47 @@ class PytorchInferenceRunner(InferenceRunner):
   def __init__(self, device: torch.device):
     self._device = device
 
-  def run_inference(self, batch: List[torch.Tensor],
-                    model: torch.nn.Module) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[torch.Tensor, Dict[str, torch.Tensor]]],
+      model: torch.nn.Module,
+      prediction_params: Optional[Dict[str, Any]] = None,
+  ) -> Iterable[PredictionResult]:
     """
     Runs inferences on a batch of Tensors and returns an Iterable of
     Tensor Predictions.
 
     This method stacks the list of Tensors in a vectorized format to optimize
     the inference call.
     """
+    if prediction_params is None:
+      prediction_params = {}
 
-    torch_batch = torch.stack(batch)
-    if torch_batch.device != self._device:
-      torch_batch = torch_batch.to(self._device)
-    predictions = model(torch_batch)
+    if isinstance(batch[0], dict):
+      result_dict = defaultdict(list)
+      for el in batch:
+        for k, v in el.items():
+          result_dict[k].append(v)
+      for k in result_dict:
+        batched_values = torch.stack(result_dict[k])
+        if batched_values.device != self._device:

Review Comment:
   what's the deal with this change. Maybe add a comment as to why it's done 
this way and this is not an error.
   
   Would it be the case ever that device would be gpu when the model is loaded 
but then batched_values would have a cpu device and then things will just work 
cleanly, or is this an error?



##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -122,6 +137,94 @@ def test_inference_runner_multiple_tensor_features(self):
     for actual, expected in zip(predictions, expected_predictions):
       self.assertTrue(_compare_prediction_result(actual, expected))
 
+  def test_inference_runner_kwargs(self):
+    examples = [
+        {
+            'k1': torch.from_numpy(np.array([1], dtype="float32")),
+            'k2': torch.from_numpy(np.array([1.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([5], dtype="float32")),
+            'k2': torch.from_numpy(np.array([5.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([-3], dtype="float32")),
+            'k2': torch.from_numpy(np.array([-3.5], dtype="float32"))
+        },
+        {
+            'k1': torch.from_numpy(np.array([10.0], dtype="float32")),
+            'k2': torch.from_numpy(np.array([10.5], dtype="float32"))
+        },
+    ]
+    expected_predictions = [

Review Comment:
   where do you validate that the expected predictions are the same as the 
actual predictions?



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -39,25 +42,47 @@ class PytorchInferenceRunner(InferenceRunner):
   def __init__(self, device: torch.device):
     self._device = device
 
-  def run_inference(self, batch: List[torch.Tensor],
-                    model: torch.nn.Module) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[torch.Tensor, Dict[str, torch.Tensor]]],
+      model: torch.nn.Module,
+      prediction_params: Optional[Dict[str, Any]] = None,
+  ) -> Iterable[PredictionResult]:
     """
     Runs inferences on a batch of Tensors and returns an Iterable of
     Tensor Predictions.
 
     This method stacks the list of Tensors in a vectorized format to optimize
     the inference call.
     """
+    if prediction_params is None:
+      prediction_params = {}
 
-    torch_batch = torch.stack(batch)
-    if torch_batch.device != self._device:
-      torch_batch = torch_batch.to(self._device)
-    predictions = model(torch_batch)
+    if isinstance(batch[0], dict):
+      result_dict = defaultdict(list)
+      for el in batch:
+        for k, v in el.items():
+          result_dict[k].append(v)
+      for k in result_dict:
+        batched_values = torch.stack(result_dict[k])
+        if batched_values.device != self._device:
+          batched_values = batched_values.to(self._device)
+        result_dict[k] = batched_values
+      predictions = model(**result_dict, **prediction_params)

Review Comment:
   The name result_dict reads strange to me here.  Why are these results and 
not samples or examples?



##########
sdks/python/apache_beam/ml/inference/sklearn_inference.py:
##########
@@ -42,8 +44,12 @@ class ModelFileType(enum.Enum):
 
 
 class SklearnInferenceRunner(InferenceRunner):
-  def run_inference(self, batch: List[numpy.ndarray],
-                    model: Any) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[numpy.ndarray],
+      model: Any,
+      prediction_params: Optional[Dict[str, Any]] = None

Review Comment:
   I suggest we don't add a new param to this interface. 
   
   Instead we can just use **kwargs and pass any implementation specific 
arguments.
   
   This would also be forward looking as any future implementations that want 
special parameters specific to themselves can get those arguments without 
having to modify every single implementation because of a new interface change.



##########
sdks/python/apache_beam/ml/inference/pytorch.py:
##########
@@ -39,25 +42,47 @@ class PytorchInferenceRunner(InferenceRunner):
   def __init__(self, device: torch.device):
     self._device = device
 
-  def run_inference(self, batch: List[torch.Tensor],
-                    model: torch.nn.Module) -> Iterable[PredictionResult]:
+  def run_inference(
+      self,
+      batch: List[Union[torch.Tensor, Dict[str, torch.Tensor]]],
+      model: torch.nn.Module,
+      prediction_params: Optional[Dict[str, Any]] = None,
+  ) -> Iterable[PredictionResult]:
     """
     Runs inferences on a batch of Tensors and returns an Iterable of
     Tensor Predictions.
 
     This method stacks the list of Tensors in a vectorized format to optimize
     the inference call.
     """
+    if prediction_params is None:
+      prediction_params = {}
 
-    torch_batch = torch.stack(batch)
-    if torch_batch.device != self._device:
-      torch_batch = torch_batch.to(self._device)
-    predictions = model(torch_batch)
+    if isinstance(batch[0], dict):
+      result_dict = defaultdict(list)
+      for el in batch:
+        for k, v in el.items():
+          result_dict[k].append(v)
+      for k in result_dict:
+        batched_values = torch.stack(result_dict[k])
+        if batched_values.device != self._device:
+          batched_values = batched_values.to(self._device)
+        result_dict[k] = batched_values
+      predictions = model(**result_dict, **prediction_params)
+    else:

Review Comment:
   Comment what this section means.
   
   Something like:
   If the user doesn't pass in XXXX input then the input is of type YYYY



##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -43,10 +43,14 @@
   raise unittest.SkipTest('PyTorch dependencies are not installed')
 
 
-def _compare_prediction_result(a, b):
-  return (
-      torch.equal(a.inference, b.inference) and
-      torch.equal(a.example, b.example))
+def _compare_prediction_result(x, y):

Review Comment:
   Instead of _compare_prediction_result you could make this 
_assert_prediction_result(a,b)
   
   then if a and b are not equal you can raise an exception with as meaningful 
of an error as possible.



##########
sdks/python/apache_beam/ml/inference/base_test.py:
##########
@@ -39,7 +41,12 @@ class FakeInferenceRunner(base.InferenceRunner):
   def __init__(self, clock=None):
     self._mock_clock = clock
 
-  def run_inference(self, batch: Any, model: Any) -> Iterable[Any]:
+  def run_inference(
+      self,
+      batch: Any,
+      model: Any,
+      prediction_params: Optional[Dict[str, Any]] = None,

Review Comment:
   Is it possible to not have this as a required param? We could have it only 
on run_inference implementations that need it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 772655)
    Time Spent: 2h 40m  (was: 2.5h)

> Support **kwargs for PyTorch models.
> ------------------------------------
>
>                 Key: BEAM-14337
>                 URL: https://issues.apache.org/jira/browse/BEAM-14337
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Anand Inguva
>            Assignee: Andy Ye
>            Priority: P2
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Some models in Pytorch instantiating from torch.nn.Module, has extra 
> parameters in the forward function call. These extra parameters can be passed 
> as Dict or as positional arguments. 
> Example of PyTorch models supported by Hugging Face -> 
> [https://huggingface.co/bert-base-uncased]
> [Some torch models on Hugging 
> face|https://github.com/huggingface/transformers/blob/main/src/transformers/models/bert/modeling_bert.py]
> Eg: 
> [https://huggingface.co/docs/transformers/model_doc/bert#transformers.BertModel]
> {code:java}
> inputs = {
>      input_ids: Tensor1,
>      attention_mask: Tensor2,
>      token_type_ids: Tensor3,
> } 
> model = BertModel.from_pretrained("bert-base-uncased") # which is a  
> # subclass of torch.nn.Module
> outputs = model(**inputs) # model forward method should be expecting the keys 
> in the inputs as the positional arguments.{code}
>  
> [Transformers|https://pytorch.org/hub/huggingface_pytorch-transformers/] 
> integrated in Pytorch is supported by Hugging Face as well. 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to