yeandy commented on code in PR #22250: URL: https://github.com/apache/beam/pull/22250#discussion_r921113109
########## sdks/python/apache_beam/examples/snippets/transforms/elementwise/runinference_test.py: ########## @@ -0,0 +1,93 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import unittest +from io import StringIO + +import mock + +from apache_beam.examples.snippets.util import assert_matches_stdout +from apache_beam.testing.test_pipeline import TestPipeline + +from . import runinference + +def check_torch_keyed_model_handler(actual): + expected = '''[START torch_keyed_model_handler] +('first_question', PredictionResult(example=tensor([105.]), inference=tensor([523.6982], grad_fn=<UnbindBackward0>))) +('second_question', PredictionResult(example=tensor([108.]), inference=tensor([538.5867], grad_fn=<UnbindBackward0>))) +('third_question', PredictionResult(example=tensor([1000.]), inference=tensor([4965.4019], grad_fn=<UnbindBackward0>))) +('fourth_question', PredictionResult(example=tensor([1013.]), inference=tensor([5029.9180], grad_fn=<UnbindBackward0>))) +[END torch_keyed_model_handler]'''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + + +def check_sklearn_keyed_model_handler(actual): + expected = '''[START sklearn_keyed_model_handler] +('first_question', PredictionResult(example=[105.0], inference=array([525.]))) +('second_question', PredictionResult(example=[108.0], inference=array([540.]))) +('third_question', PredictionResult(example=[1000.0], inference=array([5000.]))) +('fourth_question', PredictionResult(example=[1013.0], inference=array([5065.]))) +[END sklearn_keyed_model_handler] '''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + + +def check_torch_unkeyed_model_handler(actual): + expected = '''[START torch_unkeyed_model_handler] +PredictionResult(example=tensor([10.]), inference=tensor([52.2325], grad_fn=<UnbindBackward0>)) +PredictionResult(example=tensor([40.]), inference=tensor([201.1165], grad_fn=<UnbindBackward0>)) +PredictionResult(example=tensor([60.]), inference=tensor([300.3724], grad_fn=<UnbindBackward0>)) +PredictionResult(example=tensor([90.]), inference=tensor([449.2563], grad_fn=<UnbindBackward0>)) +[END torch_unkeyed_model_handler] '''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + + +def check_sklearn_unkeyed_model_handler(actual): + expected = '''[START sklearn_unkeyed_model_handler] +PredictionResult(example=array([20.], dtype=float32), inference=array([100.], dtype=float32)) +PredictionResult(example=array([40.], dtype=float32), inference=array([200.], dtype=float32)) +PredictionResult(example=array([60.], dtype=float32), inference=array([300.], dtype=float32)) +PredictionResult(example=array([90.], dtype=float32), inference=array([450.], dtype=float32)) +[END sklearn_unkeyed_model_handler] '''.splitlines()[1:-1] + assert_matches_stdout(actual, expected) + [email protected]('apache_beam.Pipeline', TestPipeline) [email protected]( + 'apache_beam.examples.snippets.transforms.elementwise.runinference.print', str) +class RunInferenceTest(unittest.TestCase): + def test_torch_unkeyed_model_handler(self): + runinference.torch_unkeyed_model_handler(check_torch_unkeyed_model_handler) + + def test_torch_keyed_model_handler(self): + runinference.torch_keyed_model_handler(check_torch_keyed_model_handler) + + def test_sklearn_unkeyed_model_handler(self): + runinference.sklearn_unkeyed_model_handler(check_sklearn_unkeyed_model_handler) + + def test_sklearn_keyed_model_handler(self): + runinference.sklearn_keyed_model_handler(check_sklearn_keyed_model_handler) + + def test_images(self): + runinference.images(check_images) + + def test_digits(self): + runinference.digits(check_digits) + Review Comment: This can be removed now? ########## sdks/python/apache_beam/examples/snippets/transforms/elementwise/runinference.py: ########## @@ -0,0 +1,157 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +def torch_unkeyed_model_handler(test=None): + # [START torch_unkeyed_model_handler] + import apache_beam as beam + import numpy + import torch + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor + + class LinearRegression(torch.nn.Module): + def __init__(self, input_dim=1, output_dim=1): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x): + out = self.linear(x) + return out + + model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt' # pylint: disable=line-too-long + model_class = LinearRegression + model_params = {'input_dim': 1, 'output_dim': 1} + model_handler = PytorchModelHandlerTensor( + model_class=model_class, + model_params=model_params, + state_dict_path=model_state_dict_path) + + unkeyed_data = numpy.array([10, 40, 60, 90], + dtype=numpy.float32).reshape(-1, 1) + + with beam.Pipeline() as p: + predictions = ( + p + | 'InputData' >> beam.Create(unkeyed_data) + | 'ConvertNumpyToTensor' >> beam.Map(torch.Tensor) + | 'PytorchRunInference' >> RunInference(model_handler=model_handler) + | beam.Map(print)) + # [END torch_unkeyed_model_handler] + if test: + test(predictions) + + +def torch_keyed_model_handler(test=None): + # [START torch_keyed_model_handler] + import apache_beam as beam + import torch + from apache_beam.ml.inference.base import KeyedModelHandler + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor + + class LinearRegression(torch.nn.Module): + def __init__(self, input_dim=1, output_dim=1): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x): + out = self.linear(x) + return out + + model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt' # pylint: disable=line-too-long + model_class = LinearRegression + model_params = {'input_dim': 1, 'output_dim': 1} + keyed_model_handler = KeyedModelHandler( + PytorchModelHandlerTensor( + model_class=model_class, + model_params=model_params, + state_dict_path=model_state_dict_path)) + + keyed_data = [("first_question", 105.00), ("second_question", 108.00), + ("third_question", 1000.00), ("fourth_question", 1013.00)] + + with beam.Pipeline() as p: + predictions = ( + p + | 'KeyedInputData' >> beam.Create(keyed_data) + | "ConvertIntToTensor" >> + beam.Map(lambda x: (x[0], torch.Tensor([x[1]]))) + | 'PytorchRunInference' >> + RunInference(model_handler=keyed_model_handler) + | beam.Map(print)) + # [END torch_keyed_model_handler] + if test: + test(predictions) + + +def sklearn_unkeyed_model_handler(test=None): + # [START sklearn_unkeyed_model_handler] + import apache_beam as beam + import numpy + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.sklearn_inference import ModelFileType + from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy + + sklearn_model_filename = 'gs://apache-beam-samples/run_inference/five_times_table_sklearn.pkl' # pylint: disable=line-too-long Review Comment: Can't find this filesystem in the unit tests. Can you double check this @AnandInguva ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = data | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +#### Ensemble Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') Review Comment: ```suggestion data = p | 'Read' >> beam.ReadFromSource('a_source') ``` ########## sdks/python/apache_beam/examples/snippets/transforms/elementwise/runinference.py: ########## @@ -0,0 +1,157 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +def torch_unkeyed_model_handler(test=None): + # [START torch_unkeyed_model_handler] + import apache_beam as beam + import numpy + import torch + from apache_beam.ml.inference.base import RunInference + from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor + + class LinearRegression(torch.nn.Module): + def __init__(self, input_dim=1, output_dim=1): + super().__init__() + self.linear = torch.nn.Linear(input_dim, output_dim) + + def forward(self, x): + out = self.linear(x) + return out + + model_state_dict_path = 'gs://apache-beam-samples/run_inference/five_times_table_torch.pt' # pylint: disable=line-too-long + model_class = LinearRegression + model_params = {'input_dim': 1, 'output_dim': 1} + model_handler = PytorchModelHandlerTensor( + model_class=model_class, + model_params=model_params, + state_dict_path=model_state_dict_path) + + unkeyed_data = numpy.array([10, 40, 60, 90], + dtype=numpy.float32).reshape(-1, 1) + + with beam.Pipeline() as p: + predictions = ( + p + | 'InputData' >> beam.Create(unkeyed_data) + | 'ConvertNumpyToTensor' >> beam.Map(torch.Tensor) + | 'PytorchRunInference' >> RunInference(model_handler=model_handler) + | beam.Map(print)) + # [END torch_unkeyed_model_handler] + if test: + test(predictions) + + +def torch_keyed_model_handler(test=None): + # [START torch_keyed_model_handler] + import apache_beam as beam + import torch Review Comment: @AnandInguva I see `ModuleNotFoundError: No module named 'torch'`. Do you know where we can install extra deps for snippet examples? ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') Review Comment: ```suggestion predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + Review Comment: ```suggestion ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = data | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +#### Ensemble Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +### Use a keyed ModelHandler + +If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object: + +``` +from apache_beam.ml.inference.base import KeyedModelHandler + +keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...)) + Review Comment: ```suggestion ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = data | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +#### Ensemble Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +### Use a keyed ModelHandler + +If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object: + +``` +from apache_beam.ml.inference.base import KeyedModelHandler + Review Comment: ```suggestion ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') Review Comment: ```suggestion data = p | 'Read' >> beam.ReadFromSource('a_source') ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = data | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +#### Ensemble Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +### Use a keyed ModelHandler + +If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object: + +``` +from apache_beam.ml.inference.base import KeyedModelHandler + +keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...)) + +with pipeline as p: + data = p | beam.Create([ + ('img1', torch.tensor([[1,2,3],[4,5,6],...])), + ('img2', torch.tensor([[1,2,3],[4,5,6],...])), + ('img3', torch.tensor([[1,2,3],[4,5,6],...])), + ]) + predictions = data | RunInference(KeyedModelHandler) +``` + +### Use the PredictionResults object + +When doing a prediction in Apache Beam, the output `PCollection` includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions. + +The `PredictionResult` is a `NamedTuple` object that contains both the input and the inferences, named `example` and `inference`, respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. + +``` +class PostProcessor(beam.DoFn): + def process(self, element: Tuple[str, PredictionResult]): + key, prediction_result = element + inputs = prediction_result.example + predictions = prediction_result.inference + + # Post-processing logic + result = ... + + yield (key, result) + +with pipeline as p: + output = ( + p | 'Read' >> beam.ReadFromSource('a_source') + | 'PyTorchRunInference' >> RunInference(<keyed_model_handler>) + | 'ProcessOutput' >> beam.ParDo(PostProcessor())) +``` + +If you need to use this object explicitly, include the following line in your pipeline to import the object: + +``` +from apache_beam.ml.inference.base import PredictionResult +``` + +For more information, see the [`PredictionResult` documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L65). Review Comment: ```suggestion For more information, see the [`PredictionResult` documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L65). ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -0,0 +1,198 @@ +--- +type: languages +title: "Apache Beam Python Machine Learning" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Machine Learning + +You can use Apache Beam with the RunInference API to use machine learning (ML) models to do local and remote inference with batch and streaming pipelines. Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are supported. You can create multiple types of transforms using the RunInference API: the API takes multiple types of setup parameters from model handlers, and the parameter type determines the model implementation. + +## Why use the RunInference API? + +RunInference takes advantage of existing Apache Beam concepts, such as the the `BatchElements` transform and the `Shared` class, to enable you to use models in your pipelines to create transforms optimized for machine learning inferences. The ability to create arbitrarily complex workflow graphs also allows you to build multi-model pipelines. + +### BatchElements PTransform + +To take advantage of the optimizations of vectorized inference that many models implement, we added the `BatchElements` transform as an intermediate step before making the prediction for the model. This transform batches elements together. The batched elements are then applied with a transformation for the particular framework of RunInference. For example, for numpy `ndarrays`, we call `numpy.stack()`, and for torch `Tensor` elements, we call `torch.stack()`. + +To customize the settings for `beam.BatchElements`, in `ModelHandler`, override the `batch_elements_kwargs` function. For example, use `min_batch_size` to set the lowest number of elements per batch or `max_batch_size` to set the highest number of elements per batch. + +For more information, see the [`BatchElements` transform documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements). + +### Shared helper class + +Instead of loading a model for each thread in the process, we use the `Shared` class, which allows us to load one model that is shared across all threads of each worker in a DoFn. For more information, see the +[`Shared` class documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20). + +### Multi-model pipelines + +The RunInference API can be composed into multi-model pipelines. Multi-model pipelines can be useful for A/B testing or for building out ensembles that are comprised of models that perform tokenization, sentence segmentation, part-of-speech tagging, named entity extraction, language detection, coreference resolution, and more. + +## Modify a pipeline to use an ML model + +To use the RunInference transform, add the following code to your pipeline: + +``` +from apache_beam.ml.inference.base import RunInference + +with pipeline as p: + predictions = ( p | 'Read' >> beam.ReadFromSource('a_source') + | 'RunInference' >> RunInference(<model_handler>) +``` +Where `model_handler` is the model handler setup code. + +To import models, you need to wrap them around a `ModelHandler` object. Which `ModelHandler` you import depends on the framework and type of data structure that contains the inputs. The following examples show some ModelHandlers that you might want to import. + +``` +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy +from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor +``` +### Use pre-trained models + +The section provides requirements for using pre-trained models with PyTorch and Scikit-learn + +#### PyTorch + +You need to provide a path to a file that contains the model saved weights. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the PyTorch framework, complete the following steps: + +1. Download the pre-trained weights and host them in a location that the pipeline can access. +2. Pass the path of the model to the PyTorch `model_handler` by using the following code: `state_dict_path=<path_to_weights>`. + +#### Scikit-learn + +You need to provide a path to a file that contains the pickled Scikit-learn model. This path must be accessible by the pipeline. To use pre-trained models with the RunInference API and the Scikit-learn framework, complete the following steps: + +1. Download the pickled model class and host it in a location that the pipeline can access. +2. Pass the path of the model to the Sklearn `model_handler` by using the following code: + `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, where you can specify + `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the model was serialized. + +### Use multiple models + +You can also use the RunInference transform to add multiple inference models to your pipeline. + +#### A/B Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = data | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +#### Ensemble Pattern + +``` +with pipeline as p: + data = p | 'Read' >> beam.ReadFromSource('a_source') + model_a_predictions = data | RunInference(<model_handler_A>) + model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>) +``` + +Where `model_handler_A` and `model_handler_B` are the model handler setup code. + +### Use a keyed ModelHandler + +If a key is attached to the examples, wrap the `KeyedModelHandler` around the `ModelHandler` object: + +``` +from apache_beam.ml.inference.base import KeyedModelHandler + +keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...)) + +with pipeline as p: + data = p | beam.Create([ + ('img1', torch.tensor([[1,2,3],[4,5,6],...])), + ('img2', torch.tensor([[1,2,3],[4,5,6],...])), + ('img3', torch.tensor([[1,2,3],[4,5,6],...])), + ]) + predictions = data | RunInference(KeyedModelHandler) +``` + +### Use the PredictionResults object + +When doing a prediction in Apache Beam, the output `PCollection` includes both the keys of the input examples and the inferences. Including both these items in the output allows you to find the input that determined the predictions. + +The `PredictionResult` is a `NamedTuple` object that contains both the input and the inferences, named `example` and `inference`, respectively. When keys are passed with the input data to the RunInference transform, the output `PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and the `PredictionResult` object. Your pipeline interacts with a `PredictionResult` object in steps after the RunInference transform. + +``` +class PostProcessor(beam.DoFn): + def process(self, element: Tuple[str, PredictionResult]): + key, prediction_result = element + inputs = prediction_result.example + predictions = prediction_result.inference + + # Post-processing logic + result = ... + + yield (key, result) + +with pipeline as p: + output = ( + p | 'Read' >> beam.ReadFromSource('a_source') Review Comment: ```suggestion p | 'Read' >> beam.ReadFromSource('a_source') ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
