[ 
https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=754075&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754075
 ]

ASF GitHub Bot logged work on BEAM-13984:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Apr/22 13:40
            Start Date: 07/Apr/22 13:40
    Worklog Time Spent: 10m 
      Work Description: yeandy commented on code in PR #17196:
URL: https://github.com/apache/beam/pull/17196#discussion_r845149318


##########
sdks/python/apache_beam/ml/inference/pytorch_impl.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+from typing import Union
+
+import numpy as np
+import pickle
+import torch
+from torch import nn
+
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  Implements Pytorch inference method
+  """
+  def __init__(self, input_dim: int, device: torch.device):
+    self._input_dim = input_dim
+    self._device = device
+
+  def run_inference(
+      self, batch: List[Union[np.ndarray, torch.Tensor]],
+      model: nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of examples and returns an Iterable of
+    Predictions."""
+    if batch:
+      if isinstance(batch[0], np.ndarray):
+        batch = torch.Tensor(batch)
+      elif isinstance(batch[0], torch.Tensor):
+        batch = torch.stack(batch)
+      else:
+        raise ValueError("PCollection must be an numpy array or a torch 
Tensor")
+
+    if batch.device != self._device:
+      batch = batch.to(self._device)
+    return model(batch)
+
+  def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+    """Returns the number of bytes of data for a batch."""
+    total_size = 0
+    for el in batch:
+      if isinstance(el, np.ndarray):
+        total_size += el.itemsize
+      elif isinstance(el, torch.Tensor):
+        total_size += el.element_size()
+      else:
+        total_size += len(pickle.dumps(el))

Review Comment:
   True. I've simplified to just compute size for Tensors.



##########
sdks/python/apache_beam/ml/inference/pytorch_impl.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+from typing import Union
+
+import numpy as np
+import pickle
+import torch
+from torch import nn
+
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+  """
+  Implements Pytorch inference method
+  """
+  def __init__(self, input_dim: int, device: torch.device):
+    self._input_dim = input_dim
+    self._device = device
+
+  def run_inference(
+      self, batch: List[Union[np.ndarray, torch.Tensor]],
+      model: nn.Module) -> Iterable[torch.Tensor]:
+    """
+    Runs inferences on a batch of examples and returns an Iterable of
+    Predictions."""
+    if batch:
+      if isinstance(batch[0], np.ndarray):
+        batch = torch.Tensor(batch)
+      elif isinstance(batch[0], torch.Tensor):
+        batch = torch.stack(batch)
+      else:
+        raise ValueError("PCollection must be an numpy array or a torch 
Tensor")
+
+    if batch.device != self._device:
+      batch = batch.to(self._device)
+    return model(batch)
+
+  def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+    """Returns the number of bytes of data for a batch."""
+    total_size = 0
+    for el in batch:
+      if isinstance(el, np.ndarray):
+        total_size += el.itemsize
+      elif isinstance(el, torch.Tensor):
+        total_size += el.element_size()
+      else:
+        total_size += len(pickle.dumps(el))
+    return total_size
+
+  def get_metrics_namespace(self) -> str:
+    """
+    Returns a namespace for metrics collected by the RunInference transform.
+    """
+    return 'RunInferencePytorch'
+
+
+class PytorchModelLoader(ModelLoader):
+  """Loads a Pytorch Model."""
+  def __init__(
+      self,
+      input_dim: int,

Review Comment:
   Not needed if input is only Tensor types. Removed.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 754075)
    Time Spent: 1h 50m  (was: 1h 40m)

> Implement RunInference for PyTorch
> ----------------------------------
>
>                 Key: BEAM-13984
>                 URL: https://issues.apache.org/jira/browse/BEAM-13984
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Andy Ye
>            Assignee: Andy Ye
>            Priority: P2
>              Labels: run-inference
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Implement RunInference for PyTorch as described in the design doc 
> [https://s.apache.org/inference-sklearn-pytorch]
> There will be a pytorch_impl.py file that contains PyTorchModelLoader and 
> PyTorchInferenceRunner classes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to