rszper commented on code in PR #25947: URL: https://github.com/apache/beam/pull/25947#discussion_r1151125750
########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" Review Comment: Maybe change this to: Auto-update ML models I think this primarily appears in the window/tab in the browser, so shorter might be better. We usually have this match the H1 title. ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. Review Comment: ```suggestion or by configuring a custom side input `PCollection` that defines the logic for the model update. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` Review Comment: ```suggestion This example uses [`WatchFilePattern`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. Review Comment: ```suggestion the RunInference `PTransform` to automatically update the ML model without stopping the Beam pipeline. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference Review Comment: ```suggestion ## Pre-process images for inference ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. Review Comment: ```suggestion 2. Read and pre-process the images using the `read_image` function. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. Review Comment: ```suggestion 3. Pass the images to the RunInference `PTransform`. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. Review Comment: ```suggestion 1. Get the image names from the Pub/Sub topic. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. + + +Once the pipeline starts processing data and when you see some outputs emitted from the `RunInference` PTransform, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as side input. + +**Note**: Side input update frequency is non-deterministic and can have longer interval between updates. Review Comment: ```suggestion **Note**: Side input update frequency is non-deterministic and can have longer intervals between updates. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. + + +Once the pipeline starts processing data and when you see some outputs emitted from the `RunInference` PTransform, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as side input. + +**Note**: Side input update frequency is non-deterministic and can have longer interval between updates. + +```python +import apache_beam as beam +from apache_beam.ml.inference.utils import WatchFilePattern +from apache_beam.ml.inference.base import RunInference +with beam.Pipeline() as pipeline: + + file_pattern = 'gs://<your-bucket>/*.h5' + pubsub_topic = '<topic_emitting_image_names>' + + side_input_pcoll = ( + pipeline + | "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern)) + + images_pcoll = ( + pipeline + | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pubsub_topic) + | "DecodeBytes" >> beam.Map(lambda x: x.decode('utf-8')) + | "PreProcessImage" >> beam.Map(read_image) + ) + + inference_pcoll = ( + images_pcoll + | "RunInference" >> RunInference( + model_handler=tf_model_handler, + model_metadata_pcoll=side_input_pcoll)) + +``` + + +### Post-process `PredictionResult` object + +Once the inference is done, RunInference outputs `PredictionResult` object that contains `example`, `inference` and, `model_id`. Here, the `model_id` is used to identify which model is used for running the inference. Review Comment: ```suggestion When the inference is complete, RunInference outputs a `PredictionResult` object that contains `example`, `inference`, and `model_id`. Here, the `model_id` is used to identify which model is used for running the inference. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** Review Comment: ```suggestion For more information about side inputs, see the [Side inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) section in the Apache Beam Programming Guide. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source Review Comment: ```suggestion ## Set up the source ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. + * `model_name`: Human-readable name for the model. This can be used to identify the model in the metrics generated by the RunInference transform. Review Comment: ```suggestion * `model_name`: Human-readable name for the model. You can use this name to identify the model in the metrics generated by the RunInference transform. ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. + * `model_name`: Human-readable name for the model. This can be used to identify the model in the metrics generated by the RunInference transform. + +Use cases: + * Use `WatchFilePattern` as side input to the RunInference PTransform for the auto updates of the ML model. More details can be found [here](/website/www/site/content/en/documentation/ml/side-input-updates.md). + +The side input PCollection must follow [AsSingleton](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html?highlight=assingleton#apache_beam.pvalue.AsSingleton) view or the pipeline will result in error. Review Comment: ```suggestion The side input `PCollection` must follow the [`AsSingleton`](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html?highlight=assingleton#apache_beam.pvalue.AsSingleton) view to avoid errors. ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. Review Comment: ```suggestion * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective `ModelHandlers` can load the models without errors. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation Review Comment: ```suggestion ## Models for image segmentation ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in Review Comment: ```suggestion based on timestamps. It emits the latest [`ModelMetadata`](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. Review Comment: ```suggestion The Pub/Sub topic emits an image path. We need to read and preprocess the image to use it for RunInference. The `read_image` function is used to read the image for inference. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). Review Comment: ```suggestion For the [`model_handler`](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we use [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). ``` ########## website/www/site/layouts/partials/section-menu/en/documentation.html: ########## @@ -226,6 +226,7 @@ <li><a href="/documentation/ml/tensorrt-runinference">Build a custom model handler with TensorRT</a></li> <li><a href="/documentation/ml/large-language-modeling">Use LLM inference</a></li> <li><a href="/documentation/ml/multi-language-inference/">Build a multi-language inference pipeline</a></li> + <li><a href="/documentation/ml/side-input-updates/">Auto Model Updates in RunInference Transforms using SideInputs</a></li> Review Comment: ```suggestion <li><a href="/documentation/ml/side-input-updates/">Use side inputs to automatically update models</a></li> ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. + * `model_name`: Human-readable name for the model. This can be used to identify the model in the metrics generated by the RunInference transform. + +Use cases: + * Use `WatchFilePattern` as side input to the RunInference PTransform for the auto updates of the ML model. More details can be found [here](/website/www/site/content/en/documentation/ml/side-input-updates.md). + +The side input PCollection must follow [AsSingleton](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html?highlight=assingleton#apache_beam.pvalue.AsSingleton) view or the pipeline will result in error. + +**Note**: If the main PCollection emits inputs and a side input has yet to receive inputs, the main PCollection is buffered until there is Review Comment: ```suggestion **Note**: If the main `PCollection` emits inputs and a side input has yet to receive inputs, the main `PCollection` is buffered until there is ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. + * `model_name`: Human-readable name for the model. This can be used to identify the model in the metrics generated by the RunInference transform. + +Use cases: + * Use `WatchFilePattern` as side input to the RunInference PTransform for the auto updates of the ML model. More details can be found [here](/website/www/site/content/en/documentation/ml/side-input-updates.md). + +The side input PCollection must follow [AsSingleton](https://beam.apache.org/releases/pydoc/current/apache_beam.pvalue.html?highlight=assingleton#apache_beam.pvalue.AsSingleton) view or the pipeline will result in error. + +**Note**: If the main PCollection emits inputs and a side input has yet to receive inputs, the main PCollection is buffered until there is + an update to the side input. This could happen with Global windowed side inputs with data driven triggers such as `AfterCount`, `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input. Review Comment: ```suggestion an update to the side input. This could happen with global windowed side inputs with data driven triggers, such as `AfterCount`, `AfterProcessingTime`. Until the side input is updated, emit the default or initial model ID that is used to pass the respective `ModelHandler` as a side input. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. Review Comment: ```suggestion The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` to the RunInference `PTransform`. This side input is used to update the models in the `model_handler` without needing to stop the beam pipeline. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. + + +Once the pipeline starts processing data and when you see some outputs emitted from the `RunInference` PTransform, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as side input. Review Comment: ```suggestion After the pipeline starts processing data and when you see some outputs emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as a side input. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. Review Comment: ```suggestion `model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [`AsSingleton`](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Because the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. + + +Once the pipeline starts processing data and when you see some outputs emitted from the `RunInference` PTransform, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as side input. + +**Note**: Side input update frequency is non-deterministic and can have longer interval between updates. + +```python +import apache_beam as beam +from apache_beam.ml.inference.utils import WatchFilePattern +from apache_beam.ml.inference.base import RunInference +with beam.Pipeline() as pipeline: + + file_pattern = 'gs://<your-bucket>/*.h5' + pubsub_topic = '<topic_emitting_image_names>' + + side_input_pcoll = ( + pipeline + | "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern)) + + images_pcoll = ( + pipeline + | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pubsub_topic) + | "DecodeBytes" >> beam.Map(lambda x: x.decode('utf-8')) + | "PreProcessImage" >> beam.Map(read_image) + ) + + inference_pcoll = ( + images_pcoll + | "RunInference" >> RunInference( + model_handler=tf_model_handler, + model_metadata_pcoll=side_input_pcoll)) + +``` + + +### Post-process `PredictionResult` object + +Once the inference is done, RunInference outputs `PredictionResult` object that contains `example`, `inference` and, `model_id`. Here, the `model_id` is used to identify which model is used for running the inference. + +```python +from apache_beam.ml.inference.base import PredictionResult + +class PostProcessor(beam.DoFn): + """ + Process the PredictionResult to get the predicted label and model id used for inference. + """ + def process(self, element: PredictionResult) -> typing.Iterable[str]: + predicted_class = numpy.argmax(element.inference[0], axis=-1) + labels_path = tf.keras.utils.get_file( + 'ImageNetLabels.txt', + 'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt' + ) + imagenet_labels = numpy.array(open(labels_path).read().splitlines()) + predicted_class_name = imagenet_labels[predicted_class] + return predicted_class_name.title(), element.model_id + +post_processor_pcoll = (inference_pcoll | "PostProcessor" >> PostProcessor()) +``` + +## Run the pipeline +```python +result = pipeline.run().wait_until_finish() +``` +**Note**: `model_name` of the `ModelMetaData` object will be attached as prefix to the [metrics](https://beam.apache.org/documentation/ml/runinference-metrics/) calculated by the RunInference PTransform. Review Comment: ```suggestion **Note**: The `model_name` of the `ModelMetaData` object will be attached as prefix to the [metrics](https://beam.apache.org/documentation/ml/runinference-metrics/) calculated by the RunInference `PTransform`. ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. Review Comment: ```suggestion To perform automatic updates of the models used with the RunInference `PTransform` without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input `PCollection` to the RunInference input parameter `model_metadata_pcoll`. ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference Review Comment: ```suggestion ## Slowly-updating side input pattern to auto-update models used in RunInference ``` ########## website/www/site/content/en/documentation/ml/side-input-updates.md: ########## @@ -0,0 +1,144 @@ +--- +title: "Auto Model Updates in RunInference Transforms using SideInputs" +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +# Use WatchFilePattern as side input to auto-update ML models in RunInference + +The pipeline in this example uses a [RunInference](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/) `PTransform` with a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` that emits `ModelMetadata` to run inferences on images using TensorFlow models. + +Using side inputs, you can update your model (which is passed in the `ModelHandler`) in real-time, even while the Beam pipeline is still running. This can be done either by leveraging one of Beam's provided patterns, such as the `WatchFilePattern`, +or by configuring a custom side input PCollection that defines the logic for the model update. + +**More about `side inputs` can be found at https://beam.apache.org/documentation/programming-guide/#side-inputs.** + +This example uses [WatchFilePattern](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.utils.html#apache_beam.ml.inference.utils.WatchFilePattern) as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` +based on timestamps. It emits the latest [ModelMetadata](https://beam.apache.org/documentation/transforms/python/elementwise/runinference/), which is used in +the RunInference `PTransform` to auto update the ML model without stopping the Beam pipeline. + +### Setting up source + +To read the image names, use a Pub/Sub topic as the source. + * The Pub/Sub topic emits a `UTF-8` encoded model path that is used to read and preprocess images to run the inference. + +### Models for image segmentation + +For the purpose of this example, use TensorFlow models saved in [HDF5](https://www.tensorflow.org/tutorials/keras/save_and_load#hdf5_format) format. + + +### Pre-processing image for inference +The PubSub topic emits an image path. We need to read and preprocess the image to use it for RunInference. `read_image` function is used to read the image for inference. + +```python +import io +from PIL import Image +from apache_beam.io.filesystems import FileSystems +import numpy +import tensorflow as tf + +def read_image(image_file_name): + with FileSystems().open(image_file_name, 'r') as file: + data = Image.open(io.BytesIO(file.read())).convert('RGB') + img = data.resize((224, 224)) + img = numpy.array(img) / 255.0 + img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32) + return img_tensor +``` + +Now, let's jump into the pipeline code. + +**Pipeline steps**: +1. Get the image names from the PubSub topic. +2. Read and pre-process the images using `read_image` function. +3. Pass the images to the `RunInference` PTransform. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters. + +For the [model_handler](https://github.com/apache/beam/blob/07f52a478174f8733c7efedb7189955142faa5fa/sdks/python/apache_beam/ml/inference/base.py#L308), we will be using [TFModelHandlerTensor](https://github.com/apache/beam/blob/186973b110d82838fb8e5ba27f0225a67c336591/sdks/python/apache_beam/ml/inference/tensorflow_inference.py#L184). +```python +from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor +# initialize TFModelHandlerTensor with a .h5 model saved in a directory accessible by the pipeline. +tf_model_handler = TFModelHandlerTensor(model_uri='gs://<your-bucket>/<model_path.h5>') +``` + +The `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) PCollection to the RunInference PTransform. This is used to update the models in the `model_handler` without needing to stop the beam pipeline. +We will use `WatchFilePattern` as side input to watch a glob pattern matching `.h5` files. + +`model_metadata_pcoll` expects a `PCollection[ModelMetadata]` compatible with [AsSingleton](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.pvalue.html#apache_beam.pvalue.AsSingleton) view. Since the pipeline uses `WatchFilePattern` as side input, it will take care of windowing and wrapping the output into `ModelMetadata`. + + +Once the pipeline starts processing data and when you see some outputs emitted from the `RunInference` PTransform, upload a `.h5` `TensorFlow` model that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference will update the `model_uri` of `TFModelHandlerTensor` using `WatchFilePattern` as side input. + +**Note**: Side input update frequency is non-deterministic and can have longer interval between updates. + +```python +import apache_beam as beam +from apache_beam.ml.inference.utils import WatchFilePattern +from apache_beam.ml.inference.base import RunInference +with beam.Pipeline() as pipeline: + + file_pattern = 'gs://<your-bucket>/*.h5' + pubsub_topic = '<topic_emitting_image_names>' + + side_input_pcoll = ( + pipeline + | "FilePatternUpdates" >> WatchFilePattern(file_pattern=file_pattern)) + + images_pcoll = ( + pipeline + | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=pubsub_topic) + | "DecodeBytes" >> beam.Map(lambda x: x.decode('utf-8')) + | "PreProcessImage" >> beam.Map(read_image) + ) + + inference_pcoll = ( + images_pcoll + | "RunInference" >> RunInference( + model_handler=tf_model_handler, + model_metadata_pcoll=side_input_pcoll)) + +``` + + +### Post-process `PredictionResult` object Review Comment: ```suggestion ## Post-process the `PredictionResult` object ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. Review Comment: ```suggestion **For example**, `PyTorchModelHandler` initially loads a model using weights and a model class. If you pass in weights from a different model class when you update the model using side inputs, the model doesn't load properly, because it expects the weights from the original model class. ``` ########## website/www/site/content/en/documentation/sdks/python-machine-learning.md: ########## @@ -243,6 +243,23 @@ For more information, see the [`PredictionResult` documentation](https://github. For detailed instructions explaining how to build and run a Python pipeline that uses ML models, see the [Example RunInference API pipelines](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) on GitHub. +## Slowly-updating side input pattern to auto update models used in RunInference +To perform auto updates of the models used in `RunInference` PTransform without stopping the Beam pipeline, pass a [`ModelMetadata`](https://beam.apache.org/releases/pydoc/current/apache_beam.ml.inference.base.html#apache_beam.ml.inference.base.ModelMetadata) side input PCollection to the input parameter `model_metadata_pcoll` of RunInference. + +`ModelMetdata` is a `NamedTuple` containing: + * `model_id`: Unique identifier for the model. This can be a file path or a URL where the model can be accessed. It is used to load the model for inference. The URL or file path must be in the compatible format so that the respective ModelHandlers can load the models without errors. + + **For example**, `PyTorchModelHandler` loads a model using weights and a model class initially, and during the update, If you pass in weights from a different model class when you update the model via side inputs, it won’t load properly because it’s expecting the weights from the original model class. + * `model_name`: Human-readable name for the model. This can be used to identify the model in the metrics generated by the RunInference transform. + +Use cases: + * Use `WatchFilePattern` as side input to the RunInference PTransform for the auto updates of the ML model. More details can be found [here](/website/www/site/content/en/documentation/ml/side-input-updates.md). Review Comment: ```suggestion * Use `WatchFilePattern` as side input to the RunInference `PTransform` to automatically update the ML model. For more information, see [Use `WatchFilePattern` as side input to auto-update ML models in RunInference](https://beam.apache.org/documentation/ml/side-input-updates). ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
