gemini-code-assist[bot] commented on code in PR #36309: URL: https://github.com/apache/beam/pull/36309#discussion_r2423302115
########## sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact_test.py: ########## @@ -0,0 +1,326 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import tempfile +import unittest + +import numpy as np +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that + +# Protect against environments where TensorRT python library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.ml.inference.base import PredictionResult, RunInference + from apache_beam.ml.inference.trt_handler_numpy_compact import ( + TensorRTEngine, + TensorRTEngineHandlerNumPy, + ) +except ImportError: + raise unittest.SkipTest('TensorRT 10.x dependencies are not installed') + +try: + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem +except ImportError: + GCSFileSystem = None # type: ignore + +# Test data +SINGLE_FEATURE_EXAMPLES = np.array([[1.0], [5.0], [-3.0], [10.0]], + dtype=np.float32) + +SINGLE_FEATURE_PREDICTIONS = [ + PredictionResult( + SINGLE_FEATURE_EXAMPLES[i], [ + np.array([SINGLE_FEATURE_EXAMPLES[i][0] * 2.0 + 0.5], + dtype=np.float32) + ]) for i in range(len(SINGLE_FEATURE_EXAMPLES)) +] + +TWO_FEATURES_EXAMPLES = np.array([[1, 5], [3, 10], [-14, 0], [0.5, 0.5]], + dtype=np.float32) + +TWO_FEATURES_PREDICTIONS = [ + PredictionResult( + TWO_FEATURES_EXAMPLES[i], + [ + np.array([ + TWO_FEATURES_EXAMPLES[i][0] * 2.0 + + TWO_FEATURES_EXAMPLES[i][1] * 3 + 0.5 + ], + dtype=np.float32) + ]) for i in range(len(TWO_FEATURES_EXAMPLES)) +] Review Comment:  The test data variables `SINGLE_FEATURE_EXAMPLES`, `SINGLE_FEATURE_PREDICTIONS`, `TWO_FEATURES_EXAMPLES`, and `TWO_FEATURES_PREDICTIONS` are defined but not used in any of the tests. Consider removing them to improve code clarity. If they are intended for future tests, they could be moved to those tests or commented out with a TODO. ########## sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py: ########## @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import threading +from typing import Any +from typing import Dict +from typing import Iterable +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple + +import numpy as np + +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult + +LOGGER = logging.getLogger("TensorRTEngineHandlerNumPy_TRT10") + +__all__ = [ + "TensorRTEngine", + "TensorRTEngineHandlerNumPy", +] + + +# --------------------------------------------------------------------- +# CUDA / TensorRT helpers +# --------------------------------------------------------------------- +def _assign_or_fail(args): + """CUDA error checking for cuda-python (Driver API).""" + from cuda import cuda # lazy import to avoid submit-time dependency + + err, *ret = args + if isinstance(err, cuda.CUresult): + if err != cuda.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"CUDA error: {err}") + else: + raise RuntimeError(f"Unknown CUDA error type: {err}") + return ret[0] if len(ret) == 1 else tuple(ret) + + +def _require_tensorrt_10() -> None: + """Assert that TensorRT 10.x Tensor API is available on this worker.""" + try: + import tensorrt as trt # noqa: F401 + except Exception as e: # pragma: no cover + raise RuntimeError("TensorRT is not installed on this worker.") from e + + # TRT 10.x indicators: + # - Engine exposes the Tensor API (num_io_tensors / get_tensor_name) + # - ExecutionContext exposes execute_async_v3 / set_input_shape / + # set_tensor_address + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + + missing_engine = [m for m in engine_reqs if not hasattr(trt.ICudaEngine, m)] + missing_ctx = [m for m in ctx_reqs if not hasattr(trt.IExecutionContext, m)] + + if missing_engine or missing_ctx: + raise RuntimeError( + "This handler requires TensorRT 10.x+. " + f"Missing on ICudaEngine: {missing_engine or 'OK'}, " + f"Missing on IExecutionContext: {missing_ctx or 'OK'}") + + +# --------------------------------------------------------------------- +# Engine load / build (TRT 10) +# --------------------------------------------------------------------- +def _load_engine(engine_path: str): + """Deserialize a .engine (plan) from FileSystems into a TRT engine.""" + _require_tensorrt_10() + + import tensorrt as trt + + with FileSystems.open(engine_path, "rb") as f: + blob = f.read() + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(blob) + if eng is None: + raise RuntimeError( + "Failed to deserialize TensorRT engine. " + "The plan may be corrupt or built with an incompatible TRT.") + return eng + + +def _load_onnx_build_engine(onnx_path: str): + """Parse ONNX and build a TRT engine immediately (Tensor API).""" + if onnx_path.lower().endswith(".engine"): + raise ValueError( + "Provided onnx_path points to .engine; pass it as engine_path instead.") + + _require_tensorrt_10() + import tensorrt as trt + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + + builder = trt.Builder(logger) + flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + network = builder.create_network(flags) + parser = trt.OnnxParser(network, logger) + + with FileSystems.open(onnx_path, "rb") as f: + data = f.read() + + if not parser.parse(data): + LOGGER.error("Failed to parse ONNX: %s", onnx_path) + for i in range(parser.num_errors): + LOGGER.error(parser.get_error(i)) + raise ValueError(f"Failed to parse ONNX: {onnx_path}") + + config = builder.create_builder_config() + # Workbench: ~1GiB workspace (tune for your model/infra) + config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) + + if getattr(builder, "platform_has_fast_fp16", False): + config.set_flag(trt.BuilderFlag.FP16) + + # Generic optimization profile for dynamic inputs. + if network.num_inputs > 0: + prof = builder.create_optimization_profile() + for i in range(network.num_inputs): + inp = network.get_input(i) + shp = list(inp.shape) + + def _d(v: int, default: int) -> int: + return default if v < 0 else v + + if len(shp) == 4: + # Assume NCHW; supply defaults where dims are dynamic. + min_shape: Tuple[int, ...] = ( + _d(shp[0], 1), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + opt_shape: Tuple[int, ...] = ( + _d(shp[0], 4), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + max_shape: Tuple[int, ...] = ( + _d(shp[0], 8), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + else: + # Fallback: make batch dynamic, keep others as-is or 1. + min_shape = tuple(_d(x, 1) for x in shp) + opt_shape = tuple(_d(x, 4 if j == 0 else 1) for j, x in enumerate(shp)) + max_shape = tuple(_d(x, 8 if j == 0 else 1) for j, x in enumerate(shp)) + + prof.set_shape(inp.name, min=min_shape, opt=opt_shape, max=max_shape) + + config.add_optimization_profile(prof) + + plan = builder.build_serialized_network(network, config) + if plan is None: + raise RuntimeError( + "build_serialized_network() returned None; check ONNX and profiles.") + + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(bytes(plan)) + if eng is None: + raise RuntimeError("Failed to deserialize engine after build.") + return eng + + +# --------------------------------------------------------------------- +# Shape & batch helpers +# --------------------------------------------------------------------- +def _resolve_output_shape(shape: Sequence[int] | None, + batch_size: int) -> Tuple[int, ...] | None: + """Replace a leading -1 (batch) with batch_size; any other -1 is an error.""" + if shape is None: + return None + shp = list(shape) + if shp and shp[0] < 0: + shp[0] = int(batch_size) + if any(d < 0 for d in shp[1:]): + raise RuntimeError(f"Unresolved non-batch dims in output shape: {shape}") + return tuple(shp) + + +def _to_contiguous_batch(x: Sequence[np.ndarray] | np.ndarray) -> np.ndarray: + """Accept either an ndarray (already a batch) or a list of ndarrays. + + Concatenates on axis 0. This avoids accidental rank-5 shapes from + upstream batching. + """ + if isinstance(x, np.ndarray): + return np.ascontiguousarray(x) + + if isinstance(x, (list, tuple)): + if len(x) == 1 and isinstance(x[0], np.ndarray): + return np.ascontiguousarray(x[0]) + + if all(isinstance(a, np.ndarray) for a in x): + first = x[0].shape + for a in x[1:]: + if len(a.shape) != len(first) or any( + sa != sb for sa, sb in zip(a.shape[1:], first[1:])): + raise ValueError( + f"Inconsistent element shapes for concatenation: " + f"{first} vs {a.shape}") + return np.ascontiguousarray(np.concatenate(x, axis=0)) + + raise ValueError( + "Batch must be ndarray or sequence of ndarrays of same " + "rank/shape (except batch).") + + +# --------------------------------------------------------------------- +# TRT 10.x engine wrapper (Tensor API only) +# --------------------------------------------------------------------- +class TensorRTEngine: + """TRT 10.x engine wrapper using the Tensor API.""" + def __init__(self, engine: Any): + import tensorrt as trt + + self.engine = engine + self.context = engine.create_execution_context() + self.context_lock = threading.RLock() + + # Tensor API enumeration + self.input_names: List[str] = [] + self.output_names: List[str] = [] + self.dtypes: Dict[str, np.dtype] = {} + + for i in range(engine.num_io_tensors): + name = engine.get_tensor_name(i) + mode = engine.get_tensor_mode(name) + if mode == trt.TensorIOMode.INPUT: + self.input_names.append(name) + else: + self.output_names.append(name) + self.dtypes[name] = np.dtype(trt.nptype(engine.get_tensor_dtype(name))) + + # Lazy allocations + self._device_ptrs: Dict[str, int] = {} # tensor name -> CUdeviceptr + self._host_out: Dict[str, np.ndarray] = {} + self._in_nbytes: int = 0 + self._stream: Optional[int] = None + self.profile_index: int = 0 + + def _ensure_stream(self) -> None: + if self._stream is None: + from cuda import cuda + self._stream = _assign_or_fail(cuda.cuStreamCreate(0)) + + def _free_ptr(self, ptr: Optional[int]) -> None: + if not ptr: + return + from cuda import cuda + try: + _assign_or_fail(cuda.cuMemFree(ptr)) + except Exception: + pass Review Comment:  The `except Exception: pass` is too broad and can hide important errors during memory deallocation. It's better to catch a more specific exception, like `RuntimeError` which is raised by `_assign_or_fail`. If the goal is to suppress errors during cleanup, consider logging them as warnings instead of silently ignoring them. ```suggestion try: _assign_or_fail(cuda.cuMemFree(ptr)) except RuntimeError as e: LOGGER.warning('Failed to free CUDA memory pointer: %s', e) ``` ########## sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact_test.py: ########## @@ -0,0 +1,326 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import tempfile +import unittest + +import numpy as np +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that + +# Protect against environments where TensorRT python library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.ml.inference.base import PredictionResult, RunInference + from apache_beam.ml.inference.trt_handler_numpy_compact import ( + TensorRTEngine, + TensorRTEngineHandlerNumPy, + ) +except ImportError: + raise unittest.SkipTest('TensorRT 10.x dependencies are not installed') + +try: + from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem +except ImportError: + GCSFileSystem = None # type: ignore + +# Test data +SINGLE_FEATURE_EXAMPLES = np.array([[1.0], [5.0], [-3.0], [10.0]], + dtype=np.float32) + +SINGLE_FEATURE_PREDICTIONS = [ + PredictionResult( + SINGLE_FEATURE_EXAMPLES[i], [ + np.array([SINGLE_FEATURE_EXAMPLES[i][0] * 2.0 + 0.5], + dtype=np.float32) + ]) for i in range(len(SINGLE_FEATURE_EXAMPLES)) +] + +TWO_FEATURES_EXAMPLES = np.array([[1, 5], [3, 10], [-14, 0], [0.5, 0.5]], + dtype=np.float32) + +TWO_FEATURES_PREDICTIONS = [ + PredictionResult( + TWO_FEATURES_EXAMPLES[i], + [ + np.array([ + TWO_FEATURES_EXAMPLES[i][0] * 2.0 + + TWO_FEATURES_EXAMPLES[i][1] * 3 + 0.5 + ], + dtype=np.float32) + ]) for i in range(len(TWO_FEATURES_EXAMPLES)) +] + + +def _compare_prediction_result(a, b): + """Compare two PredictionResult objects.""" + return ((a.example == b.example).all() and all( + np.array_equal(actual, expected) + for actual, expected in zip(a.inference, b.inference))) + + +def _build_simple_onnx_model(input_size=1, output_path=None): + """Build a simple ONNX model for testing: y = 2x + 0.5""" + try: + from onnx import helper, TensorProto + except ImportError: + raise unittest.SkipTest('ONNX dependencies are not installed') + + # Create a simple linear model: y = 2*x + 0.5 + input_tensor = helper.make_tensor_value_info( + 'input', TensorProto.FLOAT, [None, input_size]) + output_tensor = helper.make_tensor_value_info( + 'output', TensorProto.FLOAT, [None, input_size]) + + # Weight tensor: 2.0 + weight_init = helper.make_tensor( + 'weight', + TensorProto.FLOAT, [input_size, input_size], + [2.0] * (input_size * input_size)) + # Bias tensor: 0.5 + bias_init = helper.make_tensor( + 'bias', TensorProto.FLOAT, [input_size], [0.5] * input_size) + + # MatMul node + matmul_node = helper.make_node('MatMul', ['input', 'weight'], ['matmul_out']) + # Add node + add_node = helper.make_node('Add', ['matmul_out', 'bias'], ['output']) + + # Create graph + graph = helper.make_graph([matmul_node, add_node], + 'simple_linear', [input_tensor], [output_tensor], + [weight_init, bias_init]) + + # Create model + model = helper.make_model(graph, producer_name='trt_test') + model.opset_import[0].version = 13 + + if output_path: + with open(output_path, 'wb') as f: + f.write(model.SerializeToString()) + + return model + + [email protected]_tensorrt +class TensorRTEngineHandlerNumPyTest(unittest.TestCase): + """Tests for TensorRTEngineHandlerNumPy with TensorRT 10.x Tensor API.""" + def test_handler_initialization(self): + """Test that handler initializes correctly with required parameters.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + self.assertEqual(handler.min_batch_size, 1) + self.assertEqual(handler.max_batch_size, 4) + self.assertEqual(handler.engine_path, '/tmp/test.engine') + + def test_handler_initialization_both_paths_error(self): + """Test that providing both engine_path and onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + onnx_path='/tmp/test.onnx') + + def test_handler_initialization_no_path_error(self): + """Test that providing neither engine_path nor onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy(min_batch_size=1, max_batch_size=4) + + def test_handler_initialization_invalid_engine_path(self): + """Test that providing a non-.engine path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.onnx') + + def test_handler_initialization_invalid_onnx_path(self): + """Test that providing a .engine path as onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, onnx_path='/tmp/test.engine') + + def test_batch_elements_kwargs(self): + """Test that batch_elements_kwargs returns correct values.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=2, + max_batch_size=8, + max_batch_duration_secs=10, + engine_path='/tmp/test.engine') + kwargs = handler.batch_elements_kwargs() + self.assertEqual(kwargs['min_batch_size'], 2) + self.assertEqual(kwargs['max_batch_size'], 8) + self.assertEqual(kwargs['max_batch_duration_secs'], 10) + + def test_get_num_bytes_ndarray(self): + """Test get_num_bytes with numpy ndarray.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + batch = np.array([[1, 2], [3, 4]], dtype=np.float32) + num_bytes = handler.get_num_bytes(batch) + self.assertEqual(num_bytes, batch.nbytes) + + def test_get_num_bytes_list(self): + """Test get_num_bytes with list of ndarrays.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + batch = [ + np.array([1, 2], dtype=np.float32), np.array([3, 4], dtype=np.float32) + ] + num_bytes = handler.get_num_bytes(batch) + expected = sum(a.nbytes for a in batch) + self.assertEqual(num_bytes, expected) + + def test_get_metrics_namespace(self): + """Test that metrics namespace is correct.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + self.assertEqual(handler.get_metrics_namespace(), 'BeamML_TensorRT10') + + def test_share_model_across_processes(self): + """Test share_model_across_processes flag.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + large_model=True) + self.assertTrue(handler.share_model_across_processes()) + + def test_model_copies(self): + """Test model_copies parameter.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + model_copies=3) + self.assertEqual(handler.model_copies(), 3) + + @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed') Review Comment:  The tests `test_inference_with_onnx_build_on_worker`, `test_engine_initialization` (line 264), and `test_pipeline_with_onnx_model` (line 292) are skipped if `GCSFileSystem` is not available. However, these tests use local temporary files and do not seem to depend on GCS. The `FileSystems.open` call in the implementation will use `LocalFileSystem` for local paths. The `skipIf` condition seems unnecessary and could be removed from these tests. ########## sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py: ########## @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import threading +from typing import Any +from typing import Dict +from typing import Iterable +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple + +import numpy as np + +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult + +LOGGER = logging.getLogger("TensorRTEngineHandlerNumPy_TRT10") + +__all__ = [ + "TensorRTEngine", + "TensorRTEngineHandlerNumPy", +] + + +# --------------------------------------------------------------------- +# CUDA / TensorRT helpers +# --------------------------------------------------------------------- +def _assign_or_fail(args): + """CUDA error checking for cuda-python (Driver API).""" + from cuda import cuda # lazy import to avoid submit-time dependency + + err, *ret = args + if isinstance(err, cuda.CUresult): + if err != cuda.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"CUDA error: {err}") + else: + raise RuntimeError(f"Unknown CUDA error type: {err}") + return ret[0] if len(ret) == 1 else tuple(ret) + + +def _require_tensorrt_10() -> None: + """Assert that TensorRT 10.x Tensor API is available on this worker.""" + try: + import tensorrt as trt # noqa: F401 + except Exception as e: # pragma: no cover + raise RuntimeError("TensorRT is not installed on this worker.") from e + + # TRT 10.x indicators: + # - Engine exposes the Tensor API (num_io_tensors / get_tensor_name) + # - ExecutionContext exposes execute_async_v3 / set_input_shape / + # set_tensor_address + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + + missing_engine = [m for m in engine_reqs if not hasattr(trt.ICudaEngine, m)] + missing_ctx = [m for m in ctx_reqs if not hasattr(trt.IExecutionContext, m)] + + if missing_engine or missing_ctx: + raise RuntimeError( + "This handler requires TensorRT 10.x+. " + f"Missing on ICudaEngine: {missing_engine or 'OK'}, " + f"Missing on IExecutionContext: {missing_ctx or 'OK'}") + + +# --------------------------------------------------------------------- +# Engine load / build (TRT 10) +# --------------------------------------------------------------------- +def _load_engine(engine_path: str): + """Deserialize a .engine (plan) from FileSystems into a TRT engine.""" + _require_tensorrt_10() + + import tensorrt as trt + + with FileSystems.open(engine_path, "rb") as f: + blob = f.read() + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(blob) + if eng is None: + raise RuntimeError( + "Failed to deserialize TensorRT engine. " + "The plan may be corrupt or built with an incompatible TRT.") + return eng + + +def _load_onnx_build_engine(onnx_path: str): + """Parse ONNX and build a TRT engine immediately (Tensor API).""" + if onnx_path.lower().endswith(".engine"): + raise ValueError( + "Provided onnx_path points to .engine; pass it as engine_path instead.") + + _require_tensorrt_10() + import tensorrt as trt + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + + builder = trt.Builder(logger) + flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + network = builder.create_network(flags) + parser = trt.OnnxParser(network, logger) + + with FileSystems.open(onnx_path, "rb") as f: + data = f.read() + + if not parser.parse(data): + LOGGER.error("Failed to parse ONNX: %s", onnx_path) + for i in range(parser.num_errors): + LOGGER.error(parser.get_error(i)) + raise ValueError(f"Failed to parse ONNX: {onnx_path}") + + config = builder.create_builder_config() + # Workbench: ~1GiB workspace (tune for your model/infra) + config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) + + if getattr(builder, "platform_has_fast_fp16", False): + config.set_flag(trt.BuilderFlag.FP16) + + # Generic optimization profile for dynamic inputs. + if network.num_inputs > 0: + prof = builder.create_optimization_profile() + for i in range(network.num_inputs): + inp = network.get_input(i) + shp = list(inp.shape) + + def _d(v: int, default: int) -> int: + return default if v < 0 else v + + if len(shp) == 4: + # Assume NCHW; supply defaults where dims are dynamic. + min_shape: Tuple[int, ...] = ( + _d(shp[0], 1), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + opt_shape: Tuple[int, ...] = ( + _d(shp[0], 4), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + max_shape: Tuple[int, ...] = ( + _d(shp[0], 8), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + else: + # Fallback: make batch dynamic, keep others as-is or 1. + min_shape = tuple(_d(x, 1) for x in shp) + opt_shape = tuple(_d(x, 4 if j == 0 else 1) for j, x in enumerate(shp)) + max_shape = tuple(_d(x, 8 if j == 0 else 1) for j, x in enumerate(shp)) + + prof.set_shape(inp.name, min=min_shape, opt=opt_shape, max=max_shape) + + config.add_optimization_profile(prof) + + plan = builder.build_serialized_network(network, config) + if plan is None: + raise RuntimeError( + "build_serialized_network() returned None; check ONNX and profiles.") + + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(bytes(plan)) + if eng is None: + raise RuntimeError("Failed to deserialize engine after build.") + return eng + + +# --------------------------------------------------------------------- +# Shape & batch helpers +# --------------------------------------------------------------------- +def _resolve_output_shape(shape: Sequence[int] | None, + batch_size: int) -> Tuple[int, ...] | None: + """Replace a leading -1 (batch) with batch_size; any other -1 is an error.""" + if shape is None: + return None + shp = list(shape) + if shp and shp[0] < 0: + shp[0] = int(batch_size) + if any(d < 0 for d in shp[1:]): + raise RuntimeError(f"Unresolved non-batch dims in output shape: {shape}") + return tuple(shp) + + +def _to_contiguous_batch(x: Sequence[np.ndarray] | np.ndarray) -> np.ndarray: + """Accept either an ndarray (already a batch) or a list of ndarrays. + + Concatenates on axis 0. This avoids accidental rank-5 shapes from + upstream batching. + """ + if isinstance(x, np.ndarray): + return np.ascontiguousarray(x) + + if isinstance(x, (list, tuple)): + if len(x) == 1 and isinstance(x[0], np.ndarray): + return np.ascontiguousarray(x[0]) + + if all(isinstance(a, np.ndarray) for a in x): + first = x[0].shape + for a in x[1:]: + if len(a.shape) != len(first) or any( + sa != sb for sa, sb in zip(a.shape[1:], first[1:])): + raise ValueError( + f"Inconsistent element shapes for concatenation: " + f"{first} vs {a.shape}") + return np.ascontiguousarray(np.concatenate(x, axis=0)) + + raise ValueError( + "Batch must be ndarray or sequence of ndarrays of same " + "rank/shape (except batch).") + + +# --------------------------------------------------------------------- +# TRT 10.x engine wrapper (Tensor API only) +# --------------------------------------------------------------------- +class TensorRTEngine: + """TRT 10.x engine wrapper using the Tensor API.""" + def __init__(self, engine: Any): + import tensorrt as trt + + self.engine = engine + self.context = engine.create_execution_context() + self.context_lock = threading.RLock() + + # Tensor API enumeration + self.input_names: List[str] = [] + self.output_names: List[str] = [] + self.dtypes: Dict[str, np.dtype] = {} + + for i in range(engine.num_io_tensors): + name = engine.get_tensor_name(i) + mode = engine.get_tensor_mode(name) + if mode == trt.TensorIOMode.INPUT: + self.input_names.append(name) + else: + self.output_names.append(name) + self.dtypes[name] = np.dtype(trt.nptype(engine.get_tensor_dtype(name))) + + # Lazy allocations + self._device_ptrs: Dict[str, int] = {} # tensor name -> CUdeviceptr + self._host_out: Dict[str, np.ndarray] = {} + self._in_nbytes: int = 0 + self._stream: Optional[int] = None + self.profile_index: int = 0 + + def _ensure_stream(self) -> None: + if self._stream is None: + from cuda import cuda + self._stream = _assign_or_fail(cuda.cuStreamCreate(0)) + + def _free_ptr(self, ptr: Optional[int]) -> None: + if not ptr: + return + from cuda import cuda + try: + _assign_or_fail(cuda.cuMemFree(ptr)) + except Exception: + pass + + def _select_profile(self) -> None: + # Pick optimization profile (sync or async depending on TRT) + if hasattr(self.context, "set_optimization_profile_async"): + self._ensure_stream() + self.context.set_optimization_profile_async( + self.profile_index, self._stream) + elif hasattr(self.context, "set_optimization_profile"): + self.context.set_optimization_profile(self.profile_index) + + def _check_shape_in_profile(self, name: str, shape: Sequence[int]) -> None: + mi, _, ma = self.engine.get_tensor_profile_shape(name, self.profile_index) + + def ok(dim: int, lo: int, hi: int) -> bool: + if lo < 0: + lo = dim + if hi < 0: + hi = dim + return lo <= dim <= hi + + if len(shape) != len(mi): + raise RuntimeError( + f"Input '{name}' rank mismatch: given {tuple(shape)}, " + f"profile[{self.profile_index}] min={tuple(mi)} max={tuple(ma)}") + for i, dim in enumerate(shape): + if not ok(dim, mi[i], ma[i]): + raise RuntimeError( + f"Input '{name}' dim {i}={dim} outside " + f"profile[{self.profile_index}] bounds " + f"[min={mi[i]}, max={ma[i]}]. Given={tuple(shape)}, " + f"min={tuple(mi)}, max={tuple(ma)}") + + def ensure_buffers( + self, + batch: np.ndarray, + input_shapes: Optional[Dict[str, Sequence[int]]] = None, + ) -> None: + """ + Validate shapes, set input shapes, (re)allocate device + host buffers, + and set tensor addresses for Tensor API execution. + """ + from cuda import cuda + + self._select_profile() + + # Derive shapes for inputs + shapes: Dict[str, List[int]] = {} + if len(self.input_names) == 1: + shapes[self.input_names[0]] = list(batch.shape) + else: + if not input_shapes: + raise RuntimeError( + f"Engine expects multiple inputs {self.input_names}; " + "provide shapes via " + "inference_args={'input_shapes': {name: shape, ...}}") + for name in self.input_names: + if name not in input_shapes: + raise RuntimeError(f"Missing shape for input tensor '{name}'") + shapes[name] = list(map(int, input_shapes[name])) + + # Validate and set shapes + for name, shp in shapes.items(): + self._check_shape_in_profile(name, shp) + self.context.set_input_shape(name, shp) + + # Allocate first input device buffer (we copy only this from 'batch') + in_name = self.input_names[0] + in_dtype = self.dtypes[in_name] + in_nbytes = int(np.prod(shapes[in_name])) * in_dtype.itemsize + if self._device_ptrs.get(in_name) is None or self._in_nbytes != in_nbytes: + if self._device_ptrs.get(in_name) is not None: + self._free_ptr(self._device_ptrs[in_name]) + self._device_ptrs[in_name] = _assign_or_fail(cuda.cuMemAlloc(in_nbytes)) + self._in_nbytes = in_nbytes Review Comment:  The current implementation for multi-input models appears to be incomplete. 1. While it derives shapes for multiple inputs from `inference_args['input_shapes']`, it only uses the `batch` data for the first input tensor (`self.input_names[0]`). There is no mechanism to provide data for the other input tensors. The `_trt10_inference_fn` also only copies data for the first input. 2. The device buffer allocation in `ensure_buffers` only happens for the first input tensor. It should iterate over all `self.input_names` and allocate buffers for each. A common pattern for multi-input models in Beam is to have `PCollection` elements as dictionaries mapping input names to numpy arrays. The `ModelHandler` and `run_inference` function would need to be adapted to handle this. For example, `ExampleT` could be `Dict[str, np.ndarray]`, and `run_inference` would receive a batch of such dictionaries. This is a significant limitation for models with multiple inputs. ########## sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py: ########## @@ -0,0 +1,540 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import threading +from typing import Any +from typing import Dict +from typing import Iterable +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple + +import numpy as np + +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult + +LOGGER = logging.getLogger("TensorRTEngineHandlerNumPy_TRT10") + +__all__ = [ + "TensorRTEngine", + "TensorRTEngineHandlerNumPy", +] + + +# --------------------------------------------------------------------- +# CUDA / TensorRT helpers +# --------------------------------------------------------------------- +def _assign_or_fail(args): + """CUDA error checking for cuda-python (Driver API).""" + from cuda import cuda # lazy import to avoid submit-time dependency + + err, *ret = args + if isinstance(err, cuda.CUresult): + if err != cuda.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"CUDA error: {err}") + else: + raise RuntimeError(f"Unknown CUDA error type: {err}") + return ret[0] if len(ret) == 1 else tuple(ret) + + +def _require_tensorrt_10() -> None: + """Assert that TensorRT 10.x Tensor API is available on this worker.""" + try: + import tensorrt as trt # noqa: F401 + except Exception as e: # pragma: no cover + raise RuntimeError("TensorRT is not installed on this worker.") from e + + # TRT 10.x indicators: + # - Engine exposes the Tensor API (num_io_tensors / get_tensor_name) + # - ExecutionContext exposes execute_async_v3 / set_input_shape / + # set_tensor_address + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + + missing_engine = [m for m in engine_reqs if not hasattr(trt.ICudaEngine, m)] + missing_ctx = [m for m in ctx_reqs if not hasattr(trt.IExecutionContext, m)] + + if missing_engine or missing_ctx: + raise RuntimeError( + "This handler requires TensorRT 10.x+. " + f"Missing on ICudaEngine: {missing_engine or 'OK'}, " + f"Missing on IExecutionContext: {missing_ctx or 'OK'}") + + +# --------------------------------------------------------------------- +# Engine load / build (TRT 10) +# --------------------------------------------------------------------- +def _load_engine(engine_path: str): + """Deserialize a .engine (plan) from FileSystems into a TRT engine.""" + _require_tensorrt_10() + + import tensorrt as trt + + with FileSystems.open(engine_path, "rb") as f: + blob = f.read() + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(blob) + if eng is None: + raise RuntimeError( + "Failed to deserialize TensorRT engine. " + "The plan may be corrupt or built with an incompatible TRT.") + return eng + + +def _load_onnx_build_engine(onnx_path: str): + """Parse ONNX and build a TRT engine immediately (Tensor API).""" + if onnx_path.lower().endswith(".engine"): + raise ValueError( + "Provided onnx_path points to .engine; pass it as engine_path instead.") + + _require_tensorrt_10() + import tensorrt as trt + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + + builder = trt.Builder(logger) + flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + network = builder.create_network(flags) + parser = trt.OnnxParser(network, logger) + + with FileSystems.open(onnx_path, "rb") as f: + data = f.read() + + if not parser.parse(data): + LOGGER.error("Failed to parse ONNX: %s", onnx_path) + for i in range(parser.num_errors): + LOGGER.error(parser.get_error(i)) + raise ValueError(f"Failed to parse ONNX: {onnx_path}") + + config = builder.create_builder_config() + # Workbench: ~1GiB workspace (tune for your model/infra) + config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) Review Comment:  The workspace size for the ONNX builder is hardcoded to 1GB. While the comment suggests tuning it, it would be more flexible for users if this could be configured through the `TensorRTEngineHandlerNumPy` constructor. Consider adding a parameter like `onnx_builder_config_args` to `__init__` that can be used to pass arguments to the TensorRT builder config, for example to set the memory pool limit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
