[
https://issues.apache.org/jira/browse/BEAM-13984?focusedWorklogId=758069&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-758069
]
ASF GitHub Bot logged work on BEAM-13984:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/22 19:06
Start Date: 18/Apr/22 19:06
Worklog Time Spent: 10m
Work Description: yeandy commented on code in PR #17196:
URL: https://github.com/apache/beam/pull/17196#discussion_r852340653
##########
sdks/python/apache_beam/ml/inference/pytorch_test.py:
##########
@@ -0,0 +1,246 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import os
+import shutil
+import tempfile
+import unittest
+from collections import OrderedDict
+
+import numpy as np
+import pytest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# Protect against environments where pytorch library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+ import torch
+ from apache_beam.ml.inference.api import PredictionResult
+ from apache_beam.ml.inference.base import RunInference
+ from apache_beam.ml.inference.pytorch import PytorchInferenceRunner
+ from apache_beam.ml.inference.pytorch import PytorchModelLoader
+except ImportError:
+ raise unittest.SkipTest('PyTorch dependencies are not installed')
+
+
+def _compare_prediction_result(a, b):
+ return (
+ torch.equal(a.inference, b.inference) and
+ torch.equal(a.example, b.example))
+
+
+class PytorchLinearRegression(torch.nn.Module):
+ def __init__(self, input_dim, output_dim):
+ super().__init__()
+ self.linear = torch.nn.Linear(input_dim, output_dim)
+
+ def forward(self, x):
+ out = self.linear(x)
+ return out
+
+
[email protected]_pytorch
+class PytorchRunInferenceTest(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_inference_runner_single_tensor_feature(self):
+ examples = [
+ torch.from_numpy(np.array([1], dtype="float32")),
+ torch.from_numpy(np.array([5], dtype="float32")),
+ torch.from_numpy(np.array([-3], dtype="float32")),
+ torch.from_numpy(np.array([10.0], dtype="float32")),
+ ]
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([example * 2.0 + 0.5
+ for example in examples]).reshape(-1, 1))
+ ]
+
+ model = PytorchLinearRegression(input_dim=1, output_dim=1)
+ model.load_state_dict(
+ OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+ ('linear.bias', torch.Tensor([0.5]))]))
+ model.eval()
+
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ predictions = inference_runner.run_inference(examples, model)
+ for actual, expected in zip(predictions, expected_predictions):
+ self.assertTrue(_compare_prediction_result(actual, expected))
+
+ def test_inference_runner_multiple_tensor_features(self):
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ examples = [
+ torch.from_numpy(np.array([1, 5], dtype="float32")),
+ torch.from_numpy(np.array([3, 10], dtype="float32")),
+ torch.from_numpy(np.array([-14, 0], dtype="float32")),
+ torch.from_numpy(np.array([0.5, 0.5], dtype="float32")),
+ ]
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+ for f1, f2 in examples]).reshape(-1, 1))
+ ]
+
+ model = PytorchLinearRegression(input_dim=2, output_dim=1)
+ model.load_state_dict(
+ OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+ ('linear.bias', torch.Tensor([0.5]))]))
+ model.eval()
+
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ predictions = inference_runner.run_inference(examples, model)
+ for actual, expected in zip(predictions, expected_predictions):
+ self.assertTrue(_compare_prediction_result(actual, expected))
+
+ def test_num_bytes(self):
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ self.assertEqual((examples[0].element_size()) * 8,
+ inference_runner.get_num_bytes(examples))
+
+ def test_namespace(self):
+ inference_runner = PytorchInferenceRunner(torch.device('cpu'))
+ self.assertEqual(
+ 'RunInferencePytorch', inference_runner.get_metrics_namespace())
+
+ def test_pipeline_local_model(self):
+ with TestPipeline() as pipeline:
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10, -14, 0, 0.5, 0.5],
+ dtype="float32")).reshape(-1, 2)
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([f1 * 2.0 + f2 * 3 + 0.5
+ for f1, f2 in examples]).reshape(-1, 1))
+ ]
+
+ state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0, 3]])),
+ ('linear.bias', torch.Tensor([0.5]))])
+ path = os.path.join(self.tmpdir, 'my_state_dict_path')
+ torch.save(state_dict, path)
+
+ model_loader = PytorchModelLoader(
+ state_dict_path=path,
+ model_class=PytorchLinearRegression(input_dim=2, output_dim=1))
+
+ pcoll = pipeline | 'start' >> beam.Create(examples)
+ predictions = pcoll | RunInference(model_loader)
+ assert_that(
+ predictions,
+ equal_to(expected_predictions, equals_fn=_compare_prediction_result))
+
+ def test_pipeline_local_model_with_key(self):
+ with TestPipeline() as pipeline:
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+ keyed_examples = list(zip(range(len(examples)), examples))
+ expected_values = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([example * 2.0 + 0.5
+ for example in examples]).reshape(-1, 1))
+ ]
+ expected_predictions = list(zip(range(len(examples)), expected_values))
+
+ state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
+ ('linear.bias', torch.Tensor([0.5]))])
+ path = os.path.join(self.tmpdir, 'my_state_dict_path')
+ torch.save(state_dict, path)
+
+ model_loader = PytorchModelLoader(
+ state_dict_path=path,
+ model_class=PytorchLinearRegression(input_dim=1, output_dim=1))
+
+ pcoll = pipeline | 'start' >> beam.Create(keyed_examples)
+ predictions = pcoll | RunInference(model_loader)
+
+ def _compare_keyed_prediction_result(a, b):
+ key_equal = a[0] == b[0]
+ return (
+ torch.equal(a[1].inference, b[1].inference) and
+ torch.equal(a[1].example, b[1].example) and key_equal)
+
+ assert_that(
+ predictions,
+ equal_to(
+ expected_predictions,
equals_fn=_compare_keyed_prediction_result))
+
+ def test_pipeline_gcs_model(self):
+ with TestPipeline() as pipeline:
+ examples = torch.from_numpy(
+ np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
+ expected_predictions = [
+ PredictionResult(ex, pred) for ex,
+ pred in zip(
+ examples,
+ torch.Tensor([example * 2.0 + 0.5
+ for example in examples]).reshape(-1, 1))
+ ]
+
+ gs_pth =
'gs://apache-beam-ml/pytorch_lin_reg_model_2x+0.5_state_dict.pth'
Review Comment:
You're probably right on separating out the E2E tests. I was thinking that
this should be small enough tough to verify the usage of `FileSystems` module
though. Perhaps I can break it out when we start adding the E2E testing file.
Issue Time Tracking
-------------------
Worklog Id: (was: 758069)
Time Spent: 3h 20m (was: 3h 10m)
> Implement RunInference for PyTorch
> ----------------------------------
>
> Key: BEAM-13984
> URL: https://issues.apache.org/jira/browse/BEAM-13984
> Project: Beam
> Issue Type: Sub-task
> Components: sdk-py-core
> Reporter: Andy Ye
> Assignee: Andy Ye
> Priority: P2
> Labels: run-inference
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Implement RunInference for PyTorch as described in the design doc
> [https://s.apache.org/inference-sklearn-pytorch]
> There will be a pytorch_impl.py file that contains PyTorchModelLoader and
> PyTorchInferenceRunner classes.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)