damccorm commented on code in PR #26048:
URL: https://github.com/apache/beam/pull/26048#discussion_r1160236029
##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,475 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "OsFaZscKSPvo"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Update ML models in running pipelines"
+ ],
+ "metadata": {
+ "id": "ZUSiAR62SgO8"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The pipeline in this notebook uses a RunInference `PTransform` to run
inference on images using TensorFlow models. To update the model, it uses a
side input `PCollection` that emits `ModelMetadata`.\n",
+ "\n",
+ "You can use side inputs to update your model in real-time, even while
the Apache Beam pipeline is running. The side input is passed in a
`ModelHandler` configuration object. You can update the model either by
leveraging one of Apache Beam's provided patterns, such as the
`WatchFilePattern`, or by configuring a custom side input `PCollection` that
defines the logic for the model update.\n",
+ "\n",
+ "For more information about side inputs, see the [Side
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs)
section in the Apache Beam Programming Guide.\n",
+ "\n",
+ "This example uses `WatchFilePattern` as a side input.
`WatchFilePattern` is used to watch for the file updates matching the
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which
is used in the RunInference `PTransform` to automatically update the ML model
without stopping the Apache Beam pipeline.\n"
+ ],
+ "metadata": {
+ "id": "tBtqF5UpKJNZ"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Install the dependencies required to run this notebook.\n",
+ "\n",
+ "To use RunInference with side inputs for automatic model updates,
install `Apache Beam` version `2.46.0` or later."
+ ],
+ "metadata": {
+ "id": "SPuXFowiTpWx"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "1RyTYsFEIOlA"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+ "!pip install tensorflow\n",
+ "!pip install tensorflow_hub"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Imports required for the notebook.\n",
+ "import logging\n",
+ "import time\n",
+ "from typing import Iterable\n",
+ "from typing import Tuple\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.examples.inference.tensorflow_imagenet_segmentation
import PostProcessor\n",
+ "from apache_beam.examples.inference.tensorflow_imagenet_segmentation
import read_image\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.tensorflow_inference import
TFModelHandlerTensor\n",
+ "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+ "from apache_beam.options.pipeline_options import
GoogleCloudOptions\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.options.pipeline_options import SetupOptions\n",
+ "from apache_beam.options.pipeline_options import StandardOptions\n",
+ "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+ ],
+ "metadata": {
+ "id": "Rs4cwwNrIV9H"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Authenticate to your Google Cloud account.\n",
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "jAKpPcmmGm03"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Runner\n",
+ "\n",
+ "This pipeline runs on the Dataflow Runner. Ensure that you have all
the required permissions to run the pipeline on Dataflow.\n",
+ "\n",
+ "Configure the pipeline options for the pipeline to run on Dataflow.
Make sure the pipeline is using streaming mode."
+ ],
+ "metadata": {
+ "id": "ORYNKhH3WQyP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "options = PipelineOptions()\n",
+ "options.view_as(StandardOptions).streaming = True\n",
+ "\n",
+ "# Provide required pipeline options for the Dataflow Runner.\n",
+ "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+ "\n",
+ "# Set the project to the default project in your current Google Cloud
environment.\n",
+ "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+ "\n",
+ "# Set the Google Cloud region that you want to run Dataflow in.\n",
+ "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+ "\n",
+ "# IMPORTANT: Update the following line to choose a Cloud Storage
location.\n",
+ "dataflow_gcs_location = \"gs://BUCKET_NAME/tmp/\"\n",
+ "\n",
+ "# The Dataflow staging location. This location is used to stage the
Dataflow pipeline and the SDK binary.\n",
+ "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' %
dataflow_gcs_location\n",
+ "\n",
+ "# The Dataflow temp location. This location is used to store
temporary files or intermediate results before outputting to the sink.\n",
+ "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' %
dataflow_gcs_location\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "wWjbnq6X-4uE"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Install the `tensorflow` and `tensorflow_hub` dependencies on
Dataflow. Use the `requirements_file` pipeline option to pass these
dependencies."
+ ],
+ "metadata": {
+ "id": "HTJV8pO2Wcw4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# In a requirements file, define the dependencies required for the
pipeline.\n",
+ "deps_required_for_pipeline = ['tensorflow>=2.12.0',
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+ "requirements_file_path = './requirements.txt'\n",
+ "# Write the dependencies to the requirements file.\n",
+ "with open(requirements_file_path, 'w') as f:\n",
+ " for dep in deps_required_for_pipeline:\n",
+ " f.write(dep + '\\n')\n",
+ "\n",
+ "# Install the pipeline dependencies on Dataflow.\n",
+ "options.view_as(SetupOptions).requirements_file =
requirements_file_path"
+ ],
+ "metadata": {
+ "id": "lEy4PkluWbdm"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## TensorFlow ModelHandler\n",
+ " This example uses `TFModelHandlerTensor` as the model handler and
the `resnet_101` model trained on imagenet as our initial model used for
inference.\n",
+ "\n",
+ " Download the model from [Google Cloud
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
(link downloads the model), and place it in the directory that you want to use
to update your model."
+ ],
+ "metadata": {
+ "id": "_AUNH_GJk_NE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model_handler = TFModelHandlerTensor(\n",
+ "
model_uri=\"gs://BUCKET_NAME/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+ ],
+ "metadata": {
+ "id": "kkSnsxwUk-Sp"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Pre-process images\n",
+ "\n",
+ "Use `preprocess_image` to run the inference, read the image, and
convert the image to a TensorFlow tensor."
+ ],
+ "metadata": {
+ "id": "tZH0r0sL-if5"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def preprocess_image(image_name, image_dir):\n",
+ " img = tf.keras.utils.get_file(image_name, image_dir +
image_name)\n",
+ " img = Image.open(img).resize((224, 224))\n",
+ " img = numpy.array(img) / 255.0\n",
+ " img_tensor = tf.cast(tf.convert_to_tensor(img[...]),
dtype=tf.float32)\n",
+ " return img_tensor"
+ ],
+ "metadata": {
+ "id": "dU5imgTt-8Ne"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class PostProcessor(beam.DoFn):\n",
+ " \"\"\"Process the PredictionResult to get the predicted label.\n",
+ " Returns predicted label.\n",
+ " \"\"\"\n",
+ " def process(self, element: PredictionResult) -> Iterable[Tuple[str,
str]]:\n",
+ " predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+ " labels_path = tf.keras.utils.get_file(\n",
+ " 'ImageNetLabels.txt',\n",
+ "
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
# pylint: disable=line-too-long\n",
+ " )\n",
+ " imagenet_labels =
numpy.array(open(labels_path).read().splitlines())\n",
+ " predicted_class_name = imagenet_labels[predicted_class]\n",
+ " yield predicted_class_name.title(), element.model_id"
+ ],
+ "metadata": {
+ "id": "6V5tJxO6-gyt"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Define the pipeline object.\n",
+ "pipeline = beam.Pipeline(options=options)"
+ ],
+ "metadata": {
+ "id": "GpdKk72O_NXT"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Next, review the pipeline steps and examine the code.\n",
+ "\n",
+ "### Pipeline steps\n"
+ ],
+ "metadata": {
+ "id": "elZ53uxc_9Hv"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "1. Create a `PeriodicImpulse` transform, which emits output every `n`
seconds. The `PeriodicImpulse` transform generates an infinite sequence of
elements with a given runtime interval.\n",
+ "\n",
+ " In this example, `PeriodicImpulse` mimics the Pub/Sub source.
Because the inputs in a streaming pipeline arrive in intervals, use
`PeriodicImpulse` to output elements at `m` intervals.\n",
+ "\n",
+ "To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse`
code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)."
+ ],
+ "metadata": {
+ "id": "305tkV2sAD-S"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "start_timestamp = time.time() # start timestamp of the periodic
impulse\n",
+ "end_timestamp = start_timestamp + 60 * 20 # end timestamp of the
periodic impulse (will run for 20 minutes).\n",
+ "main_input_fire_interval = 60 # interval in seconds at which the main
input PCollection is emitted.\n",
+ "side_input_fire_interval = 60 # interval in seconds at which the side
input PCollection is emitted.\n",
+ "\n",
+ "periodic_impulse = (\n",
+ " pipeline\n",
+ " | \"MainInputPcoll\" >> PeriodicImpulse(\n",
+ " start_timestamp=start_timestamp,\n",
+ " stop_timestamp=end_timestamp,\n",
+ " fire_interval=main_input_fire_interval)"
+ ],
+ "metadata": {
+ "id": "vUFStz66_Tbb"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "2. To read and pre-process the images, use the `read_image` function.
This example uses `Cat-with-beanie.jpg` for all inferences.\n",
+ "\n",
+ " **Note**: Image used for prediction is licensed in CC-BY, creator
in listed in the
[LICENSE.txt](https://storage.googleapis.com/apache-beam-samples/image_captioning/LICENSE.txt)
file."
+ ],
+ "metadata": {
+ "id": "8-sal2rFAxP2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+
""
+ ],
+ "metadata": {
+ "id": "gW4cE8bhXS-d"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "image_data = (periodic_impulse | beam.Map(lambda x:
\"Cat-with-beanie.jpg\")\n",
+ " | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
+ " image_name=image_name,
image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))"
+ ],
+ "metadata": {
+ "id": "dGg11TpV_aV6"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "3. Pass the images to the RunInference `PTransform`. RunInference
takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
+ " * `model_metadata_pcoll` is a [side
input](https://beam.apache.org/documentation/programming-guide/#side-inputs)
`PCollection` to the RunInference `PTransform`. This side input is used to
update the `model_uri` in the `model_handler` without needing to stop the
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is
`'gs://BUCKET_NAME/*.h5'`.\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "eB0-ewd-BCKE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ " # The side input used to watch for the .h5 file and update the
model_uri of the TFModelHandlerTensor.\n",
+ " file_pattern = 'gs://BUCKET_NAME/*.h5'\n",
+ " side_input_pcoll = (\n",
+ " pipeline\n",
+ " | \"WatchFilePattern\" >>
WatchFilePattern(file_pattern=file_pattern,\n",
+ "
interval=side_input_fire_interval,\n",
+ "
stop_timestamp=end_timestamp))\n",
+ " inferences = (\n",
+ " image_data\n",
+ " | \"ApplyWindowing\" >>
beam.WindowInto(beam.window.FixedWindows(10))\n",
+ " | \"RunInference\" >>
RunInference(model_handler=model_handler,\n",
+ "
model_metadata_pcoll=side_input_pcoll))"
+ ],
+ "metadata": {
+ "id": "_AjvvexJ_hUq"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "4. Post-process the `PredictionResult` object.\n",
+ "\n",
+ " When the inference is complete, RunInference outputs a
`PredictionResult` object that contains the fields `example`, `inference`, and
`model_id`. The `model_id` field identifies the model used to run the
inference. The `PostProcessor` returns the predicted label and the model ID
used to run the inference on the predicted label."
+ ],
+ "metadata": {
+ "id": "lTA4wRWNDVis"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "post_processor = (\n",
+ " inferences\n",
+ " | \"PostProcessResults\" >> beam.ParDo(PostProcessor())\n",
+ " | \"LogResults\" >> beam.Map(logging.info))"
+ ],
+ "metadata": {
+ "id": "9TB76fo-_vZJ"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**How to watch for the automatic model update**\n",
+ "\n",
+ " After the pipeline starts processing data and when you see output
emitted from the RunInference `PTransform`, upload a `resnet152` model saved in
`.h5` format (for example,
[resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5))
that matches the `file_pattern` to the Google Cloud Storage bucket.
RunInference uses `WatchFilePattern` as a side input to update the `model_uri`
of `TFModelHandlerTensor`."
Review Comment:
```suggestion
" After the pipeline starts processing data and when you see output
emitted from the RunInference `PTransform`, upload a `resnet152` model saved in
`.h5` format to a Google Cloud Storage bucket location that matches the
`file_pattern` you defined earlier. You can download a copy of the model by
clicking [this
link](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5).
RunInference uses `WatchFilePattern` as a side input to update the `model_uri`
of `TFModelHandlerTensor`."
```
Clarity edit suggested. Also, it wasn't clear that this would download a
model, which can be alarming for someone if they don't trust the notebook as a
source (or just aren't expecting that to happen).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]