[ 
https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=754108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754108
 ]

ASF GitHub Bot logged work on BEAM-13984:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Apr/22 14:14
            Start Date: 07/Apr/22 14:14
    Worklog Time Spent: 10m 
      Work Description: yeandy commented on code in PR #17196:
URL: https://github.com/apache/beam/pull/17196#discussion_r845189512


##########
sdks/python/apache_beam/ml/inference/pytorch_impl.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+from typing import Union
+
+import numpy as np
+import pickle
+import torch
+from torch import nn
+
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  Implements Pytorch inference method
+  """
+  def __init__(self, input_dim: int, device: torch.device):
+    self._input_dim = input_dim
+    self._device = device
+
+  def run_inference(
+      self, batch: List[Union[np.ndarray, torch.Tensor]],
+      model: nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of examples and returns an Iterable of
+    Predictions."""
+    if batch:
+      if isinstance(batch[0], np.ndarray):
+        batch = torch.Tensor(batch)
+      elif isinstance(batch[0], torch.Tensor):
+        batch = torch.stack(batch)
+      else:
+        raise ValueError("PCollection must be an numpy array or a torch 
Tensor")
+
+    if batch.device != self._device:
+      batch = batch.to(self._device)
+    return model(batch)
+
+  def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+    """Returns the number of bytes of data for a batch."""
+    total_size = 0
+    for el in batch:
+      if isinstance(el, np.ndarray):
+        total_size += el.itemsize
+      elif isinstance(el, torch.Tensor):
+        total_size += el.element_size()
+      else:
+        total_size += len(pickle.dumps(el))
+    return total_size
+
+  def get_metrics_namespace(self) -> str:
+    """
+    Returns a namespace for metrics collected by the RunInference transform.
+    """
+    return 'RunInferencePytorch'
+
+
+class PytorchModelLoader(ModelLoader):
+  """Loads a Pytorch Model."""
+  def __init__(
+      self,
+      input_dim: int,
+      state_dict_path: str,
+      model_class: nn.Module,
+      device: str = 'CPU',
+      map_location=None):
+    """
+    input_dim: dimension (# of features) of the data
+    state_dict_path: path to the saved dictionary of the model state.
+    model_class: class of the Pytorch model that defines the model structure.
+    device: the device on which you wish to run the model. If ``device = GPU``
+        then device will be cuda if it is avaiable. Otherwise, it will be cpu.
+    map_location:
+
+    See https://pytorch.org/tutorials/beginner/saving_loading_models.html
+    for details
+    """
+    self._input_dim = input_dim
+    self._state_dict_path = state_dict_path
+    if device == 'GPU' and torch.cuda.is_available():
+      self._device = torch.device('cuda')
+    else:
+      self._device = torch.device('cpu')
+    self._model_class = model_class
+    self._model_class.to(self._device)
+    self._map_location = map_location
+
+  def load_model(self) -> nn.Module:
+    """Loads and initializes a Pytorch model for processing."""
+    model = self._model_class
+    model.load_state_dict(
+        torch.load(self._state_dict_path, map_location=self._map_location))

Review Comment:
   Good point, I can look into adding that. My initial work was assuming that 
that we are reading only from local filesystem. 
   
   Summary
   - `self._state_dict_path` is that path to a file that stores model states.
   - And `self._model_class` is a Python Pytorch class that defines the model 
structure.
   
   We're basically reading in a dictionary of coefficients/parameters/states 
that specify how to populate the model's structure (passed in via the argument 
`model_class`) with certain values.
   
   The `load_model` will then be acquired by a `Shared()` instance.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 754108)
    Time Spent: 2h 40m  (was: 2.5h)

> Implement RunInference for PyTorch
> ----------------------------------
>
>                 Key: BEAM-13984
>                 URL: https://issues.apache.org/jira/browse/BEAM-13984
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Andy Ye
>            Assignee: Andy Ye
>            Priority: P2
>              Labels: run-inference
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Implement RunInference for PyTorch as described in the design doc 
> [https://s.apache.org/inference-sklearn-pytorch]
> There will be a pytorch_impl.py file that contains PyTorchModelLoader and 
> PyTorchInferenceRunner classes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to