rszper commented on code in PR #26048:
URL: https://github.com/apache/beam/pull/26048#discussion_r1156175036


##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",

Review Comment:
   ```suggestion
           "# Write the dependencies to the requirements file.\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+      ],
+      "metadata": {
+        "id": "kkSnsxwUk-Sp"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Pre-process images\n",
+        "\n",
+        "Use `preprocess_image` to run the inference, read the image, and 
convert the image to a TensorFlow tensor."
+      ],
+      "metadata": {
+        "id": "tZH0r0sL-if5"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def preprocess_image(image_name, image_dir):\n",
+        "  img = tf.keras.utils.get_file(image_name, image_dir + 
image_name)\n",
+        "  img = Image.open(img).resize((224, 224))\n",
+        "  img = numpy.array(img) / 255.0\n",
+        "  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), 
dtype=tf.float32)\n",
+        "  return img_tensor"
+      ],
+      "metadata": {
+        "id": "dU5imgTt-8Ne"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "class PostProcessor(beam.DoFn):\n",
+        "  \"\"\"Process the PredictionResult to get the predicted label.\n",
+        "  Returns predicted label.\n",
+        "  \"\"\"\n",
+        "  def process(self, element: PredictionResult) -> Iterable[Tuple[str, 
str]]:\n",
+        "    predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+        "    labels_path = tf.keras.utils.get_file(\n",
+        "        'ImageNetLabels.txt',\n",
+        "        
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
  # pylint: disable=line-too-long\n",
+        "    )\n",
+        "    imagenet_labels = 
numpy.array(open(labels_path).read().splitlines())\n",
+        "    predicted_class_name = imagenet_labels[predicted_class]\n",
+        "    yield predicted_class_name.title(), element.model_id"
+      ],
+      "metadata": {
+        "id": "6V5tJxO6-gyt"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the pipeline object.\n",
+        "pipeline = beam.Pipeline(options=options)"
+      ],
+      "metadata": {
+        "id": "GpdKk72O_NXT"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, review the pipeline steps and examine the code.\n",
+        "\n",
+        "### Pipeline steps\n"
+      ],
+      "metadata": {
+        "id": "elZ53uxc_9Hv"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "1. Create a `PeriodicImpulse`, which emits output every `n` seconds. 
The `PeriodicImpulse` transform generates an infinite sequence of elements with 
a given runtime interval.\n",
+        "\n",
+        "  In this example, `PeriodicImpulse` mimics the Pub/Sub source. 
Because the inputs in a streaming pipeline arrives in intervals, use 
`PeriodicImpulse` to output elements at `m` intervals.\n",
+        "\n",
+        "To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse` 
code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)."
+      ],
+      "metadata": {
+        "id": "305tkV2sAD-S"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "start_timestamp = time.time() # start timestamp of the periodic 
impulse\n",
+        "end_timestamp = start_timestamp + 60 * 20 # end timestamp of the 
periodic impulse.\n",
+        "main_input_fire_interval = 60 # interval at which the main input 
PCollection is emitted.\n",
+        "side_input_fire_interval = 60 # interval at which the side input 
PCollection is emitted.\n",
+        "\n",
+        "periodic_impulse = (\n",
+        "      pipeline\n",
+        "      | \"MainInputPcoll\" >> PeriodicImpulse(\n",
+        "          start_timestamp=start_timestamp,\n",
+        "          stop_timestamp=end_timestamp,\n",
+        "          fire_interval=main_input_fire_interval)"
+      ],
+      "metadata": {
+        "id": "vUFStz66_Tbb"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "2. To read and pre-process the images, use the `read_image` function. 
This example uses `Cat-with-beanie.jpg` for all inferences."
+      ],
+      "metadata": {
+        "id": "8-sal2rFAxP2"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "image_data = (periodic_impulse | beam.Map(lambda x: 
\"Cat-with-beanie.jpg\")\n",
+        "      | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
+        "          image_name=image_name, 
image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))"
+      ],
+      "metadata": {
+        "id": "dGg11TpV_aV6"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "3. Pass the images to the RunInference `PTransform`. RunInference 
takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
+        "  * `model_metadata_pcoll` is a [side 
input](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
`PCollection` to the RunInference `PTransform`. This side input is used to 
update the `model_uri` in the `model_handler` without needing to stop the 
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a 
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is 
`'gs://your-bucket/*.h5'`.\n",
+        "\n",
+        "  **How to watch for the automatic model update**\n",
+        "\n",
+        "  After the pipeline starts processing data and when you see output 
emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model 
(for example, 
[resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5))
 that matches the `file_pattern` to the Google Cloud Storage bucket. 
RunInference uses `WatchFilePattern` as a side input to update the `model_uri` 
of `TFModelHandlerTensor`.\n"
+      ],
+      "metadata": {
+        "id": "eB0-ewd-BCKE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        " # The side input used to watch for the .h5 file and update the 
model_uri of the TFModelHandlerTensor.\n",
+        " file_pattern = 'gs://your-bucket/*.h5'\n",

Review Comment:
   ```suggestion
           " file_pattern = 'gs://BUCKET_NAME/*.h5'\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+      ],
+      "metadata": {
+        "id": "kkSnsxwUk-Sp"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Pre-process images\n",
+        "\n",
+        "Use `preprocess_image` to run the inference, read the image, and 
convert the image to a TensorFlow tensor."
+      ],
+      "metadata": {
+        "id": "tZH0r0sL-if5"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def preprocess_image(image_name, image_dir):\n",
+        "  img = tf.keras.utils.get_file(image_name, image_dir + 
image_name)\n",
+        "  img = Image.open(img).resize((224, 224))\n",
+        "  img = numpy.array(img) / 255.0\n",
+        "  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), 
dtype=tf.float32)\n",
+        "  return img_tensor"
+      ],
+      "metadata": {
+        "id": "dU5imgTt-8Ne"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "class PostProcessor(beam.DoFn):\n",
+        "  \"\"\"Process the PredictionResult to get the predicted label.\n",
+        "  Returns predicted label.\n",
+        "  \"\"\"\n",
+        "  def process(self, element: PredictionResult) -> Iterable[Tuple[str, 
str]]:\n",
+        "    predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+        "    labels_path = tf.keras.utils.get_file(\n",
+        "        'ImageNetLabels.txt',\n",
+        "        
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
  # pylint: disable=line-too-long\n",
+        "    )\n",
+        "    imagenet_labels = 
numpy.array(open(labels_path).read().splitlines())\n",
+        "    predicted_class_name = imagenet_labels[predicted_class]\n",
+        "    yield predicted_class_name.title(), element.model_id"
+      ],
+      "metadata": {
+        "id": "6V5tJxO6-gyt"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the pipeline object.\n",
+        "pipeline = beam.Pipeline(options=options)"
+      ],
+      "metadata": {
+        "id": "GpdKk72O_NXT"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, review the pipeline steps and examine the code.\n",
+        "\n",
+        "### Pipeline steps\n"
+      ],
+      "metadata": {
+        "id": "elZ53uxc_9Hv"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "1. Create a `PeriodicImpulse`, which emits output every `n` seconds. 
The `PeriodicImpulse` transform generates an infinite sequence of elements with 
a given runtime interval.\n",

Review Comment:
   ```suggestion
           "1. Create a `PeriodicImpulse` transform, which emits output every 
`n` seconds. The `PeriodicImpulse` transform generates an infinite sequence of 
elements with a given runtime interval.\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",

Review Comment:
   ```suggestion
           "dataflow_gcs_location = \"gs://BUCKET_NAME/tmp/\"\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+      ],
+      "metadata": {
+        "id": "kkSnsxwUk-Sp"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Pre-process images\n",
+        "\n",
+        "Use `preprocess_image` to run the inference, read the image, and 
convert the image to a TensorFlow tensor."
+      ],
+      "metadata": {
+        "id": "tZH0r0sL-if5"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def preprocess_image(image_name, image_dir):\n",
+        "  img = tf.keras.utils.get_file(image_name, image_dir + 
image_name)\n",
+        "  img = Image.open(img).resize((224, 224))\n",
+        "  img = numpy.array(img) / 255.0\n",
+        "  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), 
dtype=tf.float32)\n",
+        "  return img_tensor"
+      ],
+      "metadata": {
+        "id": "dU5imgTt-8Ne"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "class PostProcessor(beam.DoFn):\n",
+        "  \"\"\"Process the PredictionResult to get the predicted label.\n",
+        "  Returns predicted label.\n",
+        "  \"\"\"\n",
+        "  def process(self, element: PredictionResult) -> Iterable[Tuple[str, 
str]]:\n",
+        "    predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+        "    labels_path = tf.keras.utils.get_file(\n",
+        "        'ImageNetLabels.txt',\n",
+        "        
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
  # pylint: disable=line-too-long\n",
+        "    )\n",
+        "    imagenet_labels = 
numpy.array(open(labels_path).read().splitlines())\n",
+        "    predicted_class_name = imagenet_labels[predicted_class]\n",
+        "    yield predicted_class_name.title(), element.model_id"
+      ],
+      "metadata": {
+        "id": "6V5tJxO6-gyt"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the pipeline object.\n",
+        "pipeline = beam.Pipeline(options=options)"
+      ],
+      "metadata": {
+        "id": "GpdKk72O_NXT"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, review the pipeline steps and examine the code.\n",
+        "\n",
+        "### Pipeline steps\n"
+      ],
+      "metadata": {
+        "id": "elZ53uxc_9Hv"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "1. Create a `PeriodicImpulse`, which emits output every `n` seconds. 
The `PeriodicImpulse` transform generates an infinite sequence of elements with 
a given runtime interval.\n",
+        "\n",
+        "  In this example, `PeriodicImpulse` mimics the Pub/Sub source. 
Because the inputs in a streaming pipeline arrives in intervals, use 
`PeriodicImpulse` to output elements at `m` intervals.\n",

Review Comment:
   ```suggestion
           "  In this example, `PeriodicImpulse` mimics the Pub/Sub source. 
Because the inputs in a streaming pipeline arrive in intervals, use 
`PeriodicImpulse` to output elements at `m` intervals.\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"

Review Comment:
   ```suggestion
           "    
model_uri=\"gs://BUCKET_NAME/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+      ],
+      "metadata": {
+        "id": "kkSnsxwUk-Sp"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Pre-process images\n",
+        "\n",
+        "Use `preprocess_image` to run the inference, read the image, and 
convert the image to a TensorFlow tensor."
+      ],
+      "metadata": {
+        "id": "tZH0r0sL-if5"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def preprocess_image(image_name, image_dir):\n",
+        "  img = tf.keras.utils.get_file(image_name, image_dir + 
image_name)\n",
+        "  img = Image.open(img).resize((224, 224))\n",
+        "  img = numpy.array(img) / 255.0\n",
+        "  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), 
dtype=tf.float32)\n",
+        "  return img_tensor"
+      ],
+      "metadata": {
+        "id": "dU5imgTt-8Ne"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "class PostProcessor(beam.DoFn):\n",
+        "  \"\"\"Process the PredictionResult to get the predicted label.\n",
+        "  Returns predicted label.\n",
+        "  \"\"\"\n",
+        "  def process(self, element: PredictionResult) -> Iterable[Tuple[str, 
str]]:\n",
+        "    predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+        "    labels_path = tf.keras.utils.get_file(\n",
+        "        'ImageNetLabels.txt',\n",
+        "        
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
  # pylint: disable=line-too-long\n",
+        "    )\n",
+        "    imagenet_labels = 
numpy.array(open(labels_path).read().splitlines())\n",
+        "    predicted_class_name = imagenet_labels[predicted_class]\n",
+        "    yield predicted_class_name.title(), element.model_id"
+      ],
+      "metadata": {
+        "id": "6V5tJxO6-gyt"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the pipeline object.\n",
+        "pipeline = beam.Pipeline(options=options)"
+      ],
+      "metadata": {
+        "id": "GpdKk72O_NXT"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, review the pipeline steps and examine the code.\n",
+        "\n",
+        "### Pipeline steps\n"
+      ],
+      "metadata": {
+        "id": "elZ53uxc_9Hv"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "1. Create a `PeriodicImpulse`, which emits output every `n` seconds. 
The `PeriodicImpulse` transform generates an infinite sequence of elements with 
a given runtime interval.\n",
+        "\n",
+        "  In this example, `PeriodicImpulse` mimics the Pub/Sub source. 
Because the inputs in a streaming pipeline arrives in intervals, use 
`PeriodicImpulse` to output elements at `m` intervals.\n",
+        "\n",
+        "To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse` 
code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)."
+      ],
+      "metadata": {
+        "id": "305tkV2sAD-S"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "start_timestamp = time.time() # start timestamp of the periodic 
impulse\n",
+        "end_timestamp = start_timestamp + 60 * 20 # end timestamp of the 
periodic impulse.\n",
+        "main_input_fire_interval = 60 # interval at which the main input 
PCollection is emitted.\n",
+        "side_input_fire_interval = 60 # interval at which the side input 
PCollection is emitted.\n",
+        "\n",
+        "periodic_impulse = (\n",
+        "      pipeline\n",
+        "      | \"MainInputPcoll\" >> PeriodicImpulse(\n",
+        "          start_timestamp=start_timestamp,\n",
+        "          stop_timestamp=end_timestamp,\n",
+        "          fire_interval=main_input_fire_interval)"
+      ],
+      "metadata": {
+        "id": "vUFStz66_Tbb"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "2. To read and pre-process the images, use the `read_image` function. 
This example uses `Cat-with-beanie.jpg` for all inferences."
+      ],
+      "metadata": {
+        "id": "8-sal2rFAxP2"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "image_data = (periodic_impulse | beam.Map(lambda x: 
\"Cat-with-beanie.jpg\")\n",
+        "      | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
+        "          image_name=image_name, 
image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))"
+      ],
+      "metadata": {
+        "id": "dGg11TpV_aV6"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "3. Pass the images to the RunInference `PTransform`. RunInference 
takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
+        "  * `model_metadata_pcoll` is a [side 
input](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
`PCollection` to the RunInference `PTransform`. This side input is used to 
update the `model_uri` in the `model_handler` without needing to stop the 
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a 
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is 
`'gs://your-bucket/*.h5'`.\n",

Review Comment:
   ```suggestion
           "  * `model_metadata_pcoll` is a [side 
input](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
`PCollection` to the RunInference `PTransform`. This side input is used to 
update the `model_uri` in the `model_handler` without needing to stop the 
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a 
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is 
`'gs://BUCKET_NAME/*.h5'`.\n",
   ```



##########
beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb:
##########
@@ -0,0 +1,456 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ],
+      "metadata": {
+        "cellView": "form",
+        "id": "OsFaZscKSPvo"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "# Update ML models in running pipelines"
+      ],
+      "metadata": {
+        "id": "ZUSiAR62SgO8"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "The pipeline in this notebook uses a RunInference `PTransform` to run 
inference on images using TensorFlow models. To update the model, it uses a 
side input `PCollection` that emits `ModelMetadata`.\n",
+        "\n",
+        "You can use side inputs to update your model in real-time, even while 
the Apache Beam pipeline is running. The side input is passed in a 
`ModelHandler` configuration object. You can update the model either by 
leveraging one of Apache Beam's provided patterns, such as the 
`WatchFilePattern`, or by configuring a custom side input `PCollection` that 
defines the logic for the model update.\n",
+        "\n",
+        "For more information about side inputs, see the [Side 
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
section in the Apache Beam Programming Guide.\n",
+        "\n",
+        "This example uses `WatchFilePattern` as a side input. 
`WatchFilePattern` is used to watch for the file updates matching the 
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which 
is used in the RunInference `PTransform` to automatically update the ML model 
without stopping the Apache Beam pipeline.\n"
+      ],
+      "metadata": {
+        "id": "tBtqF5UpKJNZ"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Before you begin\n",
+        "Install the dependencies required to run this notebook.\n",
+        "\n",
+        "To use RunInference with side inputs for automatic model updates, 
install `Apache Beam` version `2.46.0` or later."
+      ],
+      "metadata": {
+        "id": "SPuXFowiTpWx"
+      }
+    },
+    {
+      "cell_type": "code",
+      "execution_count": null,
+      "metadata": {
+        "id": "1RyTYsFEIOlA"
+      },
+      "outputs": [],
+      "source": [
+        "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+        "!pip install tensorflow\n",
+        "!pip install tensorflow_hub"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Imports required for the notebook.\n",
+        "import logging\n",
+        "import time\n",
+        "from typing import Iterable\n",
+        "from typing import Tuple\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import PostProcessor\n",
+        "from apache_beam.examples.inference.tensorflow_imagenet_segmentation 
import read_image\n",
+        "from apache_beam.ml.inference.base import PredictionResult\n",
+        "from apache_beam.ml.inference.base import RunInference\n",
+        "from apache_beam.ml.inference.tensorflow_inference import 
TFModelHandlerTensor\n",
+        "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+        "from apache_beam.options.pipeline_options import 
GoogleCloudOptions\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "from apache_beam.options.pipeline_options import SetupOptions\n",
+        "from apache_beam.options.pipeline_options import StandardOptions\n",
+        "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+      ],
+      "metadata": {
+        "id": "Rs4cwwNrIV9H"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Authenticate to your Google Cloud account.\n",
+        "from google.colab import auth\n",
+        "auth.authenticate_user()"
+      ],
+      "metadata": {
+        "id": "jAKpPcmmGm03"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Runner\n",
+        "\n",
+        "This pipeline runs on the Dataflow Runner. Ensure that you have all 
the required permissions to run the pipeline on Dataflow.\n",
+        "\n",
+        "Configure the pipeline options for the pipeline to run on Dataflow. 
Make sure the pipeline is using streaming mode."
+      ],
+      "metadata": {
+        "id": "ORYNKhH3WQyP"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "options = PipelineOptions()\n",
+        "options.view_as(StandardOptions).streaming = True\n",
+        "\n",
+        "# Provide required pipeline options for the Dataflow Runner.\n",
+        "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+        "\n",
+        "# Set the project to the default project in your current Google Cloud 
environment.\n",
+        "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+        "\n",
+        "# Set the Google Cloud region that you want to run Dataflow in.\n",
+        "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+        "\n",
+        "# IMPORTANT: Update the following line to choose a Cloud Storage 
location.\n",
+        "dataflow_gcs_location = \"gs://your-bucket/tmp/\"\n",
+        "\n",
+        "# The Dataflow staging location. This location is used to stage the 
Dataflow pipeline and the SDK binary.\n",
+        "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % 
dataflow_gcs_location\n",
+        "\n",
+        "# The Dataflow temp location. This location is used to store 
temporary files or intermediate results before outputting to the sink.\n",
+        "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % 
dataflow_gcs_location\n",
+        "\n"
+      ],
+      "metadata": {
+        "id": "wWjbnq6X-4uE"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Install the `tensorflow` and `tensorflow_hub` dependencies on 
Dataflow. Use the `requirements_file` pipeline option to pass these 
dependencies."
+      ],
+      "metadata": {
+        "id": "HTJV8pO2Wcw4"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# In a requirements file, define the dependencies required for the 
pipeline.\n",
+        "deps_required_for_pipeline = ['tensorflow>=2.12.0', 
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+        "requirements_file_path = './requirements.txt'\n",
+        "# Write the depencies to the requirements file.\n",
+        "with open(requirements_file_path, 'w') as f:\n",
+        "  for dep in deps_required_for_pipeline:\n",
+        "    f.write(dep + '\\n')\n",
+        "\n",
+        "# Install the pipeline dependencies on Dataflow.\n",
+        "options.view_as(SetupOptions).requirements_file = 
requirements_file_path"
+      ],
+      "metadata": {
+        "id": "lEy4PkluWbdm"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## TensorFlow ModelHandler\n",
+        " This example uses `TFModelHandlerTensor` as the model handler and 
the `resnet_101` model trained on imagenet.\n",
+        "\n",
+        " Download the model from [Google Cloud 
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
 (link downloads the model), and place it in the directory that you want to use 
to update your model."
+      ],
+      "metadata": {
+        "id": "_AUNH_GJk_NE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "model_handler = TFModelHandlerTensor(\n",
+        "    
model_uri=\"gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+      ],
+      "metadata": {
+        "id": "kkSnsxwUk-Sp"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "## Pre-process images\n",
+        "\n",
+        "Use `preprocess_image` to run the inference, read the image, and 
convert the image to a TensorFlow tensor."
+      ],
+      "metadata": {
+        "id": "tZH0r0sL-if5"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "def preprocess_image(image_name, image_dir):\n",
+        "  img = tf.keras.utils.get_file(image_name, image_dir + 
image_name)\n",
+        "  img = Image.open(img).resize((224, 224))\n",
+        "  img = numpy.array(img) / 255.0\n",
+        "  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), 
dtype=tf.float32)\n",
+        "  return img_tensor"
+      ],
+      "metadata": {
+        "id": "dU5imgTt-8Ne"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "class PostProcessor(beam.DoFn):\n",
+        "  \"\"\"Process the PredictionResult to get the predicted label.\n",
+        "  Returns predicted label.\n",
+        "  \"\"\"\n",
+        "  def process(self, element: PredictionResult) -> Iterable[Tuple[str, 
str]]:\n",
+        "    predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+        "    labels_path = tf.keras.utils.get_file(\n",
+        "        'ImageNetLabels.txt',\n",
+        "        
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
  # pylint: disable=line-too-long\n",
+        "    )\n",
+        "    imagenet_labels = 
numpy.array(open(labels_path).read().splitlines())\n",
+        "    predicted_class_name = imagenet_labels[predicted_class]\n",
+        "    yield predicted_class_name.title(), element.model_id"
+      ],
+      "metadata": {
+        "id": "6V5tJxO6-gyt"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Define the pipeline object.\n",
+        "pipeline = beam.Pipeline(options=options)"
+      ],
+      "metadata": {
+        "id": "GpdKk72O_NXT"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "Next, review the pipeline steps and examine the code.\n",
+        "\n",
+        "### Pipeline steps\n"
+      ],
+      "metadata": {
+        "id": "elZ53uxc_9Hv"
+      }
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "1. Create a `PeriodicImpulse`, which emits output every `n` seconds. 
The `PeriodicImpulse` transform generates an infinite sequence of elements with 
a given runtime interval.\n",
+        "\n",
+        "  In this example, `PeriodicImpulse` mimics the Pub/Sub source. 
Because the inputs in a streaming pipeline arrives in intervals, use 
`PeriodicImpulse` to output elements at `m` intervals.\n",
+        "\n",
+        "To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse` 
code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)."
+      ],
+      "metadata": {
+        "id": "305tkV2sAD-S"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "start_timestamp = time.time() # start timestamp of the periodic 
impulse\n",
+        "end_timestamp = start_timestamp + 60 * 20 # end timestamp of the 
periodic impulse.\n",
+        "main_input_fire_interval = 60 # interval at which the main input 
PCollection is emitted.\n",
+        "side_input_fire_interval = 60 # interval at which the side input 
PCollection is emitted.\n",
+        "\n",
+        "periodic_impulse = (\n",
+        "      pipeline\n",
+        "      | \"MainInputPcoll\" >> PeriodicImpulse(\n",
+        "          start_timestamp=start_timestamp,\n",
+        "          stop_timestamp=end_timestamp,\n",
+        "          fire_interval=main_input_fire_interval)"
+      ],
+      "metadata": {
+        "id": "vUFStz66_Tbb"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "2. To read and pre-process the images, use the `read_image` function. 
This example uses `Cat-with-beanie.jpg` for all inferences."
+      ],
+      "metadata": {
+        "id": "8-sal2rFAxP2"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "image_data = (periodic_impulse | beam.Map(lambda x: 
\"Cat-with-beanie.jpg\")\n",
+        "      | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
+        "          image_name=image_name, 
image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))"
+      ],
+      "metadata": {
+        "id": "dGg11TpV_aV6"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "3. Pass the images to the RunInference `PTransform`. RunInference 
takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
+        "  * `model_metadata_pcoll` is a [side 
input](https://beam.apache.org/documentation/programming-guide/#side-inputs) 
`PCollection` to the RunInference `PTransform`. This side input is used to 
update the `model_uri` in the `model_handler` without needing to stop the 
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a 
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is 
`'gs://your-bucket/*.h5'`.\n",
+        "\n",
+        "  **How to watch for the automatic model update**\n",
+        "\n",
+        "  After the pipeline starts processing data and when you see output 
emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model 
(for example, 
[resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5))
 that matches the `file_pattern` to the Google Cloud Storage bucket. 
RunInference uses `WatchFilePattern` as a side input to update the `model_uri` 
of `TFModelHandlerTensor`.\n"
+      ],
+      "metadata": {
+        "id": "eB0-ewd-BCKE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        " # The side input used to watch for the .h5 file and update the 
model_uri of the TFModelHandlerTensor.\n",
+        " file_pattern = 'gs://your-bucket/*.h5'\n",
+        "  side_input_pcoll = (\n",
+        "      pipeline\n",
+        "      | \"WatchFilePattern\" >> 
WatchFilePattern(file_pattern=file_pattern,\n",
+        "                                                
interval=side_input_fire_interval,\n",
+        "                                                
stop_timestamp=end_timestamp))\n",
+        " inferences = (\n",
+        "     image_data\n",
+        "     | \"ApplyWindowing\" >> 
beam.WindowInto(beam.window.FixedWindows(10))\n",
+        "     | \"RunInference\" >> 
RunInference(model_handler=model_handler,\n",
+        "                                      
model_metadata_pcoll=side_input_pcoll))"
+      ],
+      "metadata": {
+        "id": "_AjvvexJ_hUq"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "4. Post-process the `PredictionResult` object.\n",
+        "\n",
+        "  When the inference is complete, RunInference outputs a 
`PredictionResult` object that contains the fields `example`, `inference`, and 
`model_id`. The `model_id` field is used to identify which model is used for 
running the inference. The `PostProcessor` returns the predicted label and the 
model ID used to run the inference on the predicted label."

Review Comment:
   ```suggestion
           "  When the inference is complete, RunInference outputs a 
`PredictionResult` object that contains the fields `example`, `inference`, and 
`model_id`. The `model_id` field identifies the model used to run the 
inference. The `PostProcessor` returns the predicted label and the model ID 
used to run the inference on the predicted label."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to