[
https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=754077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754077
]
ASF GitHub Bot logged work on BEAM-13984:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Apr/22 13:41
Start Date: 07/Apr/22 13:41
Worklog Time Spent: 10m
Work Description: yeandy commented on code in PR #17196:
URL: https://github.com/apache/beam/pull/17196#discussion_r845149912
##########
sdks/python/apache_beam/ml/inference/pytorch_impl.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+from typing import Union
+
+import numpy as np
+import pickle
+import torch
+from torch import nn
+
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+ """
+ Implements Pytorch inference method
+ """
+ def __init__(self, input_dim: int, device: torch.device):
+ self._input_dim = input_dim
+ self._device = device
+
+ def run_inference(
+ self, batch: List[Union[np.ndarray, torch.Tensor]],
+ model: nn.Module) -> Iterable[torch.Tensor]:
+ """
+ Runs inferences on a batch of examples and returns an Iterable of
+ Predictions."""
+ if batch:
+ if isinstance(batch[0], np.ndarray):
+ batch = torch.Tensor(batch)
+ elif isinstance(batch[0], torch.Tensor):
+ batch = torch.stack(batch)
+ else:
+ raise ValueError("PCollection must be an numpy array or a torch
Tensor")
+
+ if batch.device != self._device:
+ batch = batch.to(self._device)
+ return model(batch)
+
+ def get_num_bytes(self, batch: List[torch.Tensor]) -> int:
+ """Returns the number of bytes of data for a batch."""
+ total_size = 0
+ for el in batch:
+ if isinstance(el, np.ndarray):
+ total_size += el.itemsize
+ elif isinstance(el, torch.Tensor):
+ total_size += el.element_size()
+ else:
+ total_size += len(pickle.dumps(el))
+ return total_size
+
+ def get_metrics_namespace(self) -> str:
+ """
+ Returns a namespace for metrics collected by the RunInference transform.
+ """
+ return 'RunInferencePytorch'
+
+
+class PytorchModelLoader(ModelLoader):
+ """Loads a Pytorch Model."""
+ def __init__(
+ self,
+ input_dim: int,
+ state_dict_path: str,
Review Comment:
Yes. It is the path to the dictionary of internal model parameters/states.
PyTorch docs calls it `state_dict` during load (i.e. `model.load_state_dict()`)
so I borrowed the name.
##########
sdks/python/apache_beam/ml/inference/pytorch_impl.py:
##########
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Iterable
+from typing import List
+from typing import Union
+
+import numpy as np
+import pickle
+import torch
+from torch import nn
+
+from apache_beam.ml.inference.base import InferenceRunner
+from apache_beam.ml.inference.base import ModelLoader
+
+
+class PytorchInferenceRunner(InferenceRunner):
+ """
+ Implements Pytorch inference method
+ """
+ def __init__(self, input_dim: int, device: torch.device):
Review Comment:
Yes, it's `input_dimension`. But moot now since I've removed this arg.
Issue Time Tracking
-------------------
Worklog Id: (was: 754077)
Time Spent: 2h 10m (was: 2h)
> Implement RunInference for PyTorch
> ----------------------------------
>
> Key: BEAM-13984
> URL: https://issues.apache.org/jira/browse/BEAM-13984
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Andy Ye
> Assignee: Andy Ye
> Priority: P2
> Labels: run-inference
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Implement RunInference for PyTorch as described in the design doc
> [https://s.apache.org/inference-sklearn-pytorch]
> There will be a pytorch_impl.py file that contains PyTorchModelLoader and
> PyTorchInferenceRunner classes.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)