yeandy commented on code in PR #22131:
URL: https://github.com/apache/beam/pull/22131#discussion_r922193026


##########
sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py:
##########
@@ -0,0 +1,218 @@
+#
+# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
+# Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pytest
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
+
+# Protect against environments where TensorRT python library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import tensorrt as trt
+  from apache_beam.ml.inference.base import PredictionResult, RunInference
+  from apache_beam.ml.inference.tensorrt_inference import \
+      TensorRTEngineHandlerNumPy
+except ImportError:
+  raise unittest.SkipTest('TensorRT dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+SINGLE_FEATURE_EXAMPLES = np.array([[1], [5], [-3], [10.0]], dtype=np.float32)

Review Comment:
   ```suggestion
   SINGLE_FEATURE_EXAMPLES = [np.array(1, dtype=np.float32), np.array(5, 
dtype=np.float32), np.array(-3, dtype=np.float32), np.array(10.0, 
dtype=np.float32)]
   ```
   
   `sklearn_inference_test.py` defines test inputs like 
[this](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py#L135).
 



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference.py:
##########
@@ -0,0 +1,245 @@
+#
+# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
+# Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+import numpy as np
+from cuda import cuda
+import sys
+import tensorrt as trt
+from typing import Any, Dict, Iterable, Optional, Sequence, Tuple
+
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import ModelHandler, PredictionResult
+
+TRT_LOGGER = trt.Logger(trt.Logger.INFO)
+
+logging.basicConfig(level=logging.INFO)
+logging.getLogger("TensorRTEngineHandlerNumPy").setLevel(logging.INFO)
+log = logging.getLogger("TensorRTEngineHandlerNumPy")
+
+def _load_engine(engine_path):
+  file = FileSystems.open(engine_path, 'rb')
+  runtime = trt.Runtime(TRT_LOGGER)
+  engine = runtime.deserialize_cuda_engine(file.read())
+  assert engine
+  return engine
+
+
+def _load_onnx(onnx_path):
+  builder = trt.Builder(TRT_LOGGER)
+  network = builder.create_network(
+      flags=1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH))
+  parser = trt.OnnxParser(network, TRT_LOGGER)
+  with FileSystems.open(onnx_path) as f:
+    if not parser.parse(f.read()):
+      log.error("Failed to load ONNX file: {}".format(onnx_path))
+      for error in range(parser.num_errors):
+        log.error(parser.get_error(error))
+      sys.exit(1)
+  return network, builder
+
+
+def _build_engine(network, builder):
+  config = builder.create_builder_config()
+  runtime = trt.Runtime(TRT_LOGGER)
+  plan = builder.build_serialized_network(network, config)
+  engine = runtime.deserialize_cuda_engine(plan)
+  builder.reset()
+  return engine
+
+
+def _validate_inference_args(inference_args):
+  """Confirms that inference_args is None.
+
+  TensorRT engines do not need extra arguments in their execute_v2() call.
+  However, since inference_args is an argument in the RunInference interface,
+  we want to make sure it is not passed here in TensorRT's implementation of
+  RunInference.
+  """
+  if inference_args:
+    raise ValueError(
+        'inference_args were provided, but should be None because TensorRT '
+        'engines do not need extra arguments in their execute_v2() call.')
+
+
+class TensorRTEngine:
+  def __init__(self, engine: trt.ICudaEngine):
+    """Implementation of the TensorRTEngine class which handles
+    allocations associated with TensorRT engine.
+
+    Example Usage::
+
+    TensorRTEngine(engine)
+
+    Args:
+      engine: trt.ICudaEngine object that contains TensorRT engine
+    """
+    self.engine = engine
+    self.context = engine.create_execution_context()
+    self.inputs = []
+    self.outputs = []
+    self.gpu_allocations = []
+    self.cpu_allocations = []
+
+    # Setup I/O bindings
+    for i in range(self.engine.num_bindings):
+      is_input = False
+      if self.engine.binding_is_input(i):
+        is_input = True
+      name = self.engine.get_binding_name(i)
+      dtype = self.engine.get_binding_dtype(i)
+      shape = self.engine.get_binding_shape(i)
+      if is_input:
+        batch_size = shape[0]
+      size = np.dtype(trt.nptype(dtype)).itemsize
+      for s in shape:
+        size *= s
+      err, allocation = cuda.cuMemAlloc(size)
+      binding = {
+          'index': i,
+          'name': name,
+          'dtype': np.dtype(trt.nptype(dtype)),
+          'shape': list(shape),
+          'allocation': allocation,
+          'size': size
+      }
+      self.gpu_allocations.append(allocation)
+      if self.engine.binding_is_input(i):
+        self.inputs.append(binding)
+      else:
+        self.outputs.append(binding)
+    
+    assert self.context
+    assert batch_size > 0
+    assert len(self.inputs) > 0
+    assert len(self.outputs) > 0
+    assert len(self.gpu_allocations) > 0
+
+    for output in self.outputs:
+      self.cpu_allocations.append(np.zeros(output['shape'], output['dtype']))
+
+  def get_engine_attrs(self):
+    """Returns TensorRT engine attributes."""
+    return self.engine, self.context, self.inputs, self.outputs, 
self.gpu_allocations, self.cpu_allocations
+
+
+class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray,
+                                              PredictionResult,
+                                              TensorRTEngine]):
+  def __init__(self, min_batch_size: int, max_batch_size: int, **kwargs):
+    """Implementation of the ModelHandler interface for TensorRT.
+
+    Example Usage::
+
+    pcoll | RunInference(
+        TensorRTEngineHandlerNumPy(
+          min_batch_size=1,
+          max_batch_size=1,
+          engine_path="my_uri"))
+
+    Args:
+      min_batch_size: minimum accepted batch size.
+      max_batch_size: maximum accepted batch size.
+      kwargs: Additional arguments like 'engine_path' and 'onnx_path' are
+      currently supported.
+
+    See https://docs.nvidia.com/deeplearning/tensorrt/api/python_api/
+    for details
+    """
+    self.min_batch_size = min_batch_size
+    self.max_batch_size = max_batch_size
+    if 'engine_path' in kwargs:
+      self.engine_path = kwargs.get('engine_path')
+    elif 'onnx_path' in kwargs:
+      self.onnx_path = kwargs.get('onnx_path')
+
+    trt.init_libnvinfer_plugins(TRT_LOGGER, namespace="")
+
+  def batch_elements_kwargs(self):
+    """Sets min_batch_size and max_batch_size of a TensorRT engine."""
+    return {
+        'min_batch_size': self.min_batch_size,
+        'max_batch_size': self.max_batch_size
+    }
+
+  def load_model(self) -> TensorRTEngine:
+    """Loads and initializes a TensorRT engine for processing."""
+    engine = _load_engine(self.engine_path)
+    return TensorRTEngine(engine)
+
+  def load_onnx(self) -> Tuple[trt.INetworkDefinition, trt.Builder]:
+    """Loads and parses an onnx model for processing."""
+    return _load_onnx(self.onnx_path)
+
+  def build_engine(self, network: trt.INetworkDefinition, builder: 
trt.Builder) -> TensorRTEngine:
+    """Build an engine according to parsed/created network."""
+    engine = _build_engine(network, builder)
+    return TensorRTEngine(engine)
+
+  def run_inference(
+      self,
+      batch: np.ndarray,
+      engine: TensorRTEngine,
+      inference_args: Optional[Dict[str, Any]] = None
+  ) -> Iterable[PredictionResult]:

Review Comment:
   I'm seeing this lint error:
   ```
   19:21:43 apache_beam/ml/inference/tensorrt_inference.py:196: error: Argument 
1 of "run_inference" is incompatible with supertype "ModelHandler"; supertype 
defines the argument type as "Sequence[ndarray[Any, Any]]"  [override]
   19:21:43 apache_beam/ml/inference/tensorrt_inference.py:196: note: This 
violates the Liskov substitution principle
   19:21:43 apache_beam/ml/inference/tensorrt_inference.py:196: note: See 
https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
   ```
   I think we need to adhere to the 
[interface](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L91)
 for `run_inference` as found in `base.py`. Could we make it 
`Sequence[np.ndarray]` like it's done in 
[sklearn_inference.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/sklearn_inference.py#L105)?
 
   
   This may require some refactor some of the tests. I think particularly how 
the input examples are defined



##########
sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py:
##########
@@ -0,0 +1,218 @@
+#
+# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
+# Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import numpy as np
+import pytest
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that, equal_to
+
+# Protect against environments where TensorRT python library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
+try:
+  import tensorrt as trt
+  from apache_beam.ml.inference.base import PredictionResult, RunInference
+  from apache_beam.ml.inference.tensorrt_inference import \
+      TensorRTEngineHandlerNumPy
+except ImportError:
+  raise unittest.SkipTest('TensorRT dependencies are not installed')
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  GCSFileSystem = None  # type: ignore
+
+LOGGER = trt.Logger(trt.Logger.INFO)
+
+SINGLE_FEATURE_EXAMPLES = np.array([[1], [5], [-3], [10.0]], dtype=np.float32)
+
+SINGLE_FEATURE_PREDICTIONS = [
+    PredictionResult(ex, pred) for ex,
+    pred in zip(
+        SINGLE_FEATURE_EXAMPLES,
+        [[np.array(example * 2.0 + 0.5, dtype=np.float32)] for example in 
SINGLE_FEATURE_EXAMPLES])
+]
+
+TWO_FEATURES_EXAMPLES = np.array([[1, 5],[3, 10],[-14, 0],[0.5, 0.5]], 
dtype=np.float32)

Review Comment:
   Same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to