ryanthompson591 commented on code in PR #22250:
URL: https://github.com/apache/beam/pull/22250#discussion_r921251940


##########
website/www/site/content/en/documentation/sdks/python.md:
##########
@@ -46,7 +46,13 @@ new I/O connectors. See the [Developing I/O connectors 
overview](/documentation/
 for information about developing new I/O connectors and links to
 language-specific implementation guidance.
 
-## Using Beam Python SDK in your ML pipelines
+## Making machine learning inferences with Python
+
+To integrate machine learning models into your pipelines for making 
inferences, use the RunInference API for PyTorch and Scikit-learn models. If 
you are using TensorFlow models, you can make use of the
+[library from 
`tfx_bsl`](https://github.com/tensorflow/tfx-bsl/tree/master/tfx_bsl/beam). 
Further integrations for TensorFlow are planned. For more information, see 
[Beam issue #21442](https://github.com/apache/beam/issues/21442).

Review Comment:
   You could say --- Further integrations for popular machine learning 
frameworks like Tensorflow are planned.
   
   But I'm wondering if its worth saying anything at all, since this 
documentation could end up out of date soon.



##########
website/www/site/content/en/documentation/sdks/python-machine-learning.md:
##########
@@ -0,0 +1,198 @@
+---
+type: languages
+title: "Apache Beam Python Machine Learning"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Machine Learning
+
+You can use Apache Beam with the RunInference API to use machine learning (ML) 
models to do local and remote inference with batch and streaming pipelines. 
Starting with Apache Beam 2.40.0, PyTorch and Scikit-learn frameworks are 
supported. You can create multiple types of transforms using the RunInference 
API: the API takes multiple types of setup parameters from model handlers, and 
the parameter type determines the model implementation.
+
+## Why use the RunInference API?
+
+RunInference takes advantage of existing Apache Beam concepts, such as the the 
`BatchElements` transform and the `Shared` class, to enable you to use models 
in your pipelines to create transforms optimized for machine learning 
inferences. The ability to create arbitrarily complex workflow graphs also 
allows you to build multi-model pipelines.
+
+### BatchElements PTransform
+
+To take advantage of the optimizations of vectorized inference that many 
models implement, we added the `BatchElements` transform as an intermediate 
step before making the prediction for the model. This transform batches 
elements together. The batched elements are then applied with a transformation 
for the particular framework of RunInference. For example, for numpy 
`ndarrays`, we call `numpy.stack()`,  and for torch `Tensor` elements, we call 
`torch.stack()`.
+
+To customize the settings for `beam.BatchElements`, in `ModelHandler`, 
override the `batch_elements_kwargs` function. For example, use 
`min_batch_size` to set the lowest number of elements per batch or 
`max_batch_size` to set the highest number of elements per batch.
+
+For more information, see the [`BatchElements` transform 
documentation](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements).
+
+### Shared helper class
+
+Instead of loading a model for each thread in the process, we use the `Shared` 
class, which allows us to load one model that is shared across all threads of 
each worker in a DoFn. For more information, see the
+[`Shared` class 
documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py#L20).
+
+### Multi-model pipelines
+
+The RunInference API can be composed into multi-model pipelines. Multi-model 
pipelines can be useful for A/B testing or for building out ensembles that are 
comprised of models that perform tokenization, sentence segmentation, 
part-of-speech tagging, named entity extraction, language detection, 
coreference resolution, and more.
+
+## Modify a pipeline to use an ML model
+
+To use the RunInference transform, add the following code to your pipeline:
+
+```
+from apache_beam.ml.inference.base import RunInference
+ 
+with pipeline as p:
+   predictions = ( p |  'Read' >> beam.ReadFromSource('a_source')   
+                     | 'RunInference' >> RunInference(<model_handler>)
+```
+Where `model_handler` is the model handler setup code.
+
+To import models, you need to wrap them around a `ModelHandler` object. Which 
`ModelHandler` you import depends on the framework and type of data structure 
that contains the inputs. The following examples show some ModelHandlers that 
you might want to import.
+
+```
+from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
+from apache_beam.ml.inference.sklearn_inference import 
SklearnModelHandlerPandas
+from apache_beam.ml.inference.pytorch_inference import 
PytorchModelHandlerTensor
+from apache_beam.ml.inference.pytorch_inference import 
PytorchModelHandlerKeyedTensor
+```
+### Use pre-trained models
+
+The section provides requirements for using pre-trained models with PyTorch 
and Scikit-learn
+
+#### PyTorch
+
+You need to provide a path to a file that contains the model saved weights. 
This path must be accessible by the pipeline. To use pre-trained models with 
the RunInference API and the PyTorch framework, complete the following steps:
+
+1. Download the pre-trained weights and host them in a location that the 
pipeline can access.
+2. Pass the path of the model to the PyTorch `model_handler` by using the 
following code: `state_dict_path=<path_to_weights>`.
+
+#### Scikit-learn
+
+You need to provide a path to a file that contains the pickled Scikit-learn 
model. This path must be accessible by the pipeline. To use pre-trained models 
with the RunInference API and the Scikit-learn framework, complete the 
following steps:
+
+1. Download the pickled model class and host it in a location that the 
pipeline can access.
+2. Pass the path of the model to the Sklearn `model_handler` by using the 
following code:
+   `model_uri=<path_to_pickled_file>` and `model_file_type: <ModelFileType>`, 
where you can specify
+   `ModelFileType.PICKLE` or `ModelFileType.JOBLIB`, depending on how the 
model was serialized.
+
+### Use multiple models
+
+You can also use the RunInference transform to add multiple inference models 
to your pipeline.
+
+#### A/B Pattern
+
+```
+with pipeline as p:
+   data = p | 'Read' >> beam.ReadFromSource('a_source') 
+   model_a_predictions = data | RunInference(<model_handler_A>)
+   model_b_predictions = data | RunInference(<model_handler_B>)
+```
+
+Where `model_handler_A` and `model_handler_B` are the model handler setup code.
+
+#### Ensemble Pattern
+
+```
+with pipeline as p:
+   data = p | 'Read' >> beam.ReadFromSource('a_source') 
+   model_a_predictions = data | RunInference(<model_handler_A>)
+   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) 
| RunInference(<model_handler_B>)
+```
+
+Where `model_handler_A` and `model_handler_B` are the model handler setup code.
+
+### Use a keyed ModelHandler
+
+If a key is attached to the examples, wrap the `KeyedModelHandler` around the 
`ModelHandler` object:
+
+```
+from apache_beam.ml.inference.base import KeyedModelHandler
+ 
+keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
+ 
+with pipeline as p:
+   data = p | beam.Create([
+      ('img1', torch.tensor([[1,2,3],[4,5,6],...])),
+      ('img2', torch.tensor([[1,2,3],[4,5,6],...])),
+      ('img3', torch.tensor([[1,2,3],[4,5,6],...])),
+   ])
+   predictions = data | RunInference(KeyedModelHandler)
+```
+
+### Use the PredictionResults object
+
+When doing a prediction in Apache Beam, the output `PCollection` includes both 
the keys of the input examples and the inferences. Including both these items 
in the output allows you to find the input that determined the predictions.
+
+The `PredictionResult` is a `NamedTuple` object that contains both the input 
and the inferences, named  `example` and  `inference`, respectively. When keys 
are passed with the input data to the RunInference transform, the output 
`PCollection` returns a `Tuple[str, PredictionResult]`, which is the key and 
the `PredictionResult` object. Your pipeline interacts with a 
`PredictionResult` object in steps after the RunInference transform.
+
+```
+class PostProcessor(beam.DoFn):
+    def process(self, element: Tuple[str, PredictionResult]):
+       key, prediction_result = element
+       inputs = prediction_result.example
+       predictions = prediction_result.inference
+
+       # Post-processing logic
+       result = ...
+
+       yield (key, result)
+
+with pipeline as p:
+    output = (
+        p | 'Read' >> beam.ReadFromSource('a_source') 
+                | 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
+                | 'ProcessOutput' >> beam.ParDo(PostProcessor()))
+```
+
+If you need to use this object explicitly, include the following line in your 
pipeline to import the object:
+
+```
+from apache_beam.ml.inference.base import PredictionResult
+```
+
+For more information, see the [`PredictionResult` 
documentation](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/base.py#L65).
 
+
+## Run a machine learning pipeline
+
+For detailed instructions explaining how to build and run a pipeline that uses 
ML models, see the
+[Example RunInference API 
pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference)
 on GitHub.
+
+## Troubleshooting
+
+If you run into problems with your pipeline or job, this section lists issues 
that you might encounter and provides suggestions for how to fix them.
+
+### Incorrect inferences in the PredictionResult object
+
+In some cases, the `PredictionResults` output might not include the correct 
predictions in the `inferences` field. This issue occurs when you use a model 
whose inferences return a dictionary that maps keys to predictions and other 
metadata. An example return type is `Dict[str, Tensor]`.
+
+The RunInference API currently expects outputs to be an `Iterable[Any]`. 
Example return types are `Iterable[Tensor]` or `Iterable[Dict[str, Tensor]]`. 
When RunInference zips the inputs with the predictions, the predictions iterate 
over the dictionary keys instead of the batch elements. The result is that the 
key name is preserved but the prediction tensors are discarded. For more 
information, see the [Pytorch RunInference PredictionResult is a 
Dict](https://github.com/apache/beam/issues/22240) issue in the Apache Beam 
GitHub project.
+
+To work with the current RunInference implementation, you can create a wrapper 
class that overrides the `model(input)` call. In PyTorch, for example, your 
wrapper would override the `forward()` function and return an output with the 
appropriate format of `List[Dict[str, torch.Tensor]]`. For more information, 
see our [HuggingFace language modeling 
example](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_language_modeling.py#L49).
+
+### Unable to batch tensor elements
+
+RunInference uses dynamic batching. However, the RunInference API cannot batch 
tensor elements of different sizes, because `torch.stack()` expects tensors of 
the same length. If you provide images of different sizes or word embeddings of 
different lengths, errors might occur.

Review Comment:
   Yes, that is a good generalization for sklearn as well.  Going forward it 
may not always going to be the case, TF might have some leeway with the way it 
sets up protos, but it is a good troubleshooting thing to think about in 
general.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to