This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1a52deb9b9c Auto model updates notebook (#26048)
1a52deb9b9c is described below
commit 1a52deb9b9c0ac38aae1ebe8f085359149ac42c3
Author: Anand Inguva <[email protected]>
AuthorDate: Fri Apr 7 08:27:49 2023 -0400
Auto model updates notebook (#26048)
* Created using Colaboratory
* Created using Colaboratory
* Delete beam-ml auto_model_updates_using_side_inputs.ipynb
* beam/examples/notebooks/beam-ml/side_input_model_updates.ipynb
* Created using Colaboratory
* Delete [WIP]Side_Input_model_updates.ipynb
* Delete side_Input_model_updates.ipynb
* Created using Colaboratory
* Created using Colaboratory
* Created using Colaboratory
* Apply suggestions from code review
Co-authored-by: Rebecca Szper <[email protected]>
* Apply suggestions from code review
Co-authored-by: Rebecca Szper <[email protected]>
* Update beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb
Co-authored-by: Rebecca Szper <[email protected]>
* Apply suggestions from code review
Co-authored-by: Danny McCormick <[email protected]>
* Created using Colaboratory
* Created using Colaboratory
* Update beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb
Co-authored-by: Danny McCormick <[email protected]>
---------
Co-authored-by: Rebecca Szper <[email protected]>
Co-authored-by: Danny McCormick <[email protected]>
---
.../beam-ml/side_Input_model_updates.ipynb | 475 +++++++++++++++++++++
1 file changed, 475 insertions(+)
diff --git a/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb
b/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb
new file mode 100644
index 00000000000..c2dbed8f7ca
--- /dev/null
+++ b/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb
@@ -0,0 +1,475 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ],
+ "metadata": {
+ "cellView": "form",
+ "id": "OsFaZscKSPvo"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "# Update ML models in running pipelines"
+ ],
+ "metadata": {
+ "id": "ZUSiAR62SgO8"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "The pipeline in this notebook uses a RunInference `PTransform` to run
inference on images using TensorFlow models. To update the model, it uses a
side input `PCollection` that emits `ModelMetadata`.\n",
+ "\n",
+ "You can use side inputs to update your model in real-time, even while
the Apache Beam pipeline is running. The side input is passed in a
`ModelHandler` configuration object. You can update the model either by
leveraging one of Apache Beam's provided patterns, such as the
`WatchFilePattern`, or by configuring a custom side input `PCollection` that
defines the logic for the model update.\n",
+ "\n",
+ "For more information about side inputs, see the [Side
inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs)
section in the Apache Beam Programming Guide.\n",
+ "\n",
+ "This example uses `WatchFilePattern` as a side input.
`WatchFilePattern` is used to watch for the file updates matching the
`file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which
is used in the RunInference `PTransform` to automatically update the ML model
without stopping the Apache Beam pipeline.\n"
+ ],
+ "metadata": {
+ "id": "tBtqF5UpKJNZ"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Before you begin\n",
+ "Install the dependencies required to run this notebook.\n",
+ "\n",
+ "To use RunInference with side inputs for automatic model updates,
install `Apache Beam` version `2.46.0` or later."
+ ],
+ "metadata": {
+ "id": "SPuXFowiTpWx"
+ }
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "1RyTYsFEIOlA"
+ },
+ "outputs": [],
+ "source": [
+ "!pip install apache_beam[gcp]>=2.46.0 --quiet\n",
+ "!pip install tensorflow\n",
+ "!pip install tensorflow_hub"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Imports required for the notebook.\n",
+ "import logging\n",
+ "import time\n",
+ "from typing import Iterable\n",
+ "from typing import Tuple\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.examples.inference.tensorflow_imagenet_segmentation
import PostProcessor\n",
+ "from apache_beam.examples.inference.tensorflow_imagenet_segmentation
import read_image\n",
+ "from apache_beam.ml.inference.base import PredictionResult\n",
+ "from apache_beam.ml.inference.base import RunInference\n",
+ "from apache_beam.ml.inference.tensorflow_inference import
TFModelHandlerTensor\n",
+ "from apache_beam.ml.inference.utils import WatchFilePattern\n",
+ "from apache_beam.options.pipeline_options import
GoogleCloudOptions\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "from apache_beam.options.pipeline_options import SetupOptions\n",
+ "from apache_beam.options.pipeline_options import StandardOptions\n",
+ "from apache_beam.transforms.periodicsequence import PeriodicImpulse"
+ ],
+ "metadata": {
+ "id": "Rs4cwwNrIV9H"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Authenticate to your Google Cloud account.\n",
+ "from google.colab import auth\n",
+ "auth.authenticate_user()"
+ ],
+ "metadata": {
+ "id": "jAKpPcmmGm03"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Runner\n",
+ "\n",
+ "This pipeline runs on the Dataflow Runner. Ensure that you have all
the required permissions to run the pipeline on Dataflow.\n",
+ "\n",
+ "Configure the pipeline options for the pipeline to run on Dataflow.
Make sure the pipeline is using streaming mode."
+ ],
+ "metadata": {
+ "id": "ORYNKhH3WQyP"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "options = PipelineOptions()\n",
+ "options.view_as(StandardOptions).streaming = True\n",
+ "\n",
+ "# Provide required pipeline options for the Dataflow Runner.\n",
+ "options.view_as(StandardOptions).runner = \"DataflowRunner\"\n",
+ "\n",
+ "# Set the project to the default project in your current Google Cloud
environment.\n",
+ "options.view_as(GoogleCloudOptions).project = 'your-project'\n",
+ "\n",
+ "# Set the Google Cloud region that you want to run Dataflow in.\n",
+ "options.view_as(GoogleCloudOptions).region = 'us-central1'\n",
+ "\n",
+ "# IMPORTANT: Update the following line to choose a Cloud Storage
location.\n",
+ "dataflow_gcs_location = \"gs://BUCKET_NAME/tmp/\"\n",
+ "\n",
+ "# The Dataflow staging location. This location is used to stage the
Dataflow pipeline and the SDK binary.\n",
+ "options.view_as(GoogleCloudOptions).staging_location = '%s/staging' %
dataflow_gcs_location\n",
+ "\n",
+ "# The Dataflow temp location. This location is used to store
temporary files or intermediate results before outputting to the sink.\n",
+ "options.view_as(GoogleCloudOptions).temp_location = '%s/temp' %
dataflow_gcs_location\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "wWjbnq6X-4uE"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Install the `tensorflow` and `tensorflow_hub` dependencies on
Dataflow. Use the `requirements_file` pipeline option to pass these
dependencies."
+ ],
+ "metadata": {
+ "id": "HTJV8pO2Wcw4"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# In a requirements file, define the dependencies required for the
pipeline.\n",
+ "deps_required_for_pipeline = ['tensorflow>=2.12.0',
'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']\n",
+ "requirements_file_path = './requirements.txt'\n",
+ "# Write the dependencies to the requirements file.\n",
+ "with open(requirements_file_path, 'w') as f:\n",
+ " for dep in deps_required_for_pipeline:\n",
+ " f.write(dep + '\\n')\n",
+ "\n",
+ "# Install the pipeline dependencies on Dataflow.\n",
+ "options.view_as(SetupOptions).requirements_file =
requirements_file_path"
+ ],
+ "metadata": {
+ "id": "lEy4PkluWbdm"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## TensorFlow ModelHandler\n",
+ " This example uses `TFModelHandlerTensor` as the model handler and
the `resnet_101` model trained on imagenet as our initial model used for
inference.\n",
+ "\n",
+ " Download the model from [Google Cloud
Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5)
(link downloads the model), and place it in the directory that you want to use
to update your model."
+ ],
+ "metadata": {
+ "id": "_AUNH_GJk_NE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "model_handler = TFModelHandlerTensor(\n",
+ "
model_uri=\"gs://BUCKET_NAME/resnet101_weights_tf_dim_ordering_tf_kernels.h5\")"
+ ],
+ "metadata": {
+ "id": "kkSnsxwUk-Sp"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Pre-process images\n",
+ "\n",
+ "Use `preprocess_image` to run the inference, read the image, and
convert the image to a TensorFlow tensor."
+ ],
+ "metadata": {
+ "id": "tZH0r0sL-if5"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "def preprocess_image(image_name, image_dir):\n",
+ " img = tf.keras.utils.get_file(image_name, image_dir +
image_name)\n",
+ " img = Image.open(img).resize((224, 224))\n",
+ " img = numpy.array(img) / 255.0\n",
+ " img_tensor = tf.cast(tf.convert_to_tensor(img[...]),
dtype=tf.float32)\n",
+ " return img_tensor"
+ ],
+ "metadata": {
+ "id": "dU5imgTt-8Ne"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "class PostProcessor(beam.DoFn):\n",
+ " \"\"\"Process the PredictionResult to get the predicted label.\n",
+ " Returns predicted label.\n",
+ " \"\"\"\n",
+ " def process(self, element: PredictionResult) -> Iterable[Tuple[str,
str]]:\n",
+ " predicted_class = numpy.argmax(element.inference, axis=-1)\n",
+ " labels_path = tf.keras.utils.get_file(\n",
+ " 'ImageNetLabels.txt',\n",
+ "
'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'
# pylint: disable=line-too-long\n",
+ " )\n",
+ " imagenet_labels =
numpy.array(open(labels_path).read().splitlines())\n",
+ " predicted_class_name = imagenet_labels[predicted_class]\n",
+ " yield predicted_class_name.title(), element.model_id"
+ ],
+ "metadata": {
+ "id": "6V5tJxO6-gyt"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Define the pipeline object.\n",
+ "pipeline = beam.Pipeline(options=options)"
+ ],
+ "metadata": {
+ "id": "GpdKk72O_NXT"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "Next, review the pipeline steps and examine the code.\n",
+ "\n",
+ "### Pipeline steps\n"
+ ],
+ "metadata": {
+ "id": "elZ53uxc_9Hv"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "1. Create a `PeriodicImpulse` transform, which emits output every `n`
seconds. The `PeriodicImpulse` transform generates an infinite sequence of
elements with a given runtime interval.\n",
+ "\n",
+ " In this example, `PeriodicImpulse` mimics the Pub/Sub source.
Because the inputs in a streaming pipeline arrive in intervals, use
`PeriodicImpulse` to output elements at `m` intervals.\n",
+ "\n",
+ "To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse`
code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150)."
+ ],
+ "metadata": {
+ "id": "305tkV2sAD-S"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "start_timestamp = time.time() # start timestamp of the periodic
impulse\n",
+ "end_timestamp = start_timestamp + 60 * 20 # end timestamp of the
periodic impulse (will run for 20 minutes).\n",
+ "main_input_fire_interval = 60 # interval in seconds at which the main
input PCollection is emitted.\n",
+ "side_input_fire_interval = 60 # interval in seconds at which the side
input PCollection is emitted.\n",
+ "\n",
+ "periodic_impulse = (\n",
+ " pipeline\n",
+ " | \"MainInputPcoll\" >> PeriodicImpulse(\n",
+ " start_timestamp=start_timestamp,\n",
+ " stop_timestamp=end_timestamp,\n",
+ " fire_interval=main_input_fire_interval)"
+ ],
+ "metadata": {
+ "id": "vUFStz66_Tbb"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "2. To read and pre-process the images, use the `read_image` function.
This example uses `Cat-with-beanie.jpg` for all inferences.\n",
+ "\n",
+ " **Note**: Image used for prediction is licensed in CC-BY, creator
in listed in the
[LICENSE.txt](https://storage.googleapis.com/apache-beam-samples/image_captioning/LICENSE.txt)
file."
+ ],
+ "metadata": {
+ "id": "8-sal2rFAxP2"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+
"\n",
+ " | \"ReadImage\" >> beam.Map(lambda image_name: read_image(\n",
+ " image_name=image_name,
image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))"
+ ],
+ "metadata": {
+ "id": "dGg11TpV_aV6"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "3. Pass the images to the RunInference `PTransform`. RunInference
takes `model_handler` and `model_metadata_pcoll` as input parameters.\n",
+ " * `model_metadata_pcoll` is a [side
input](https://beam.apache.org/documentation/programming-guide/#side-inputs)
`PCollection` to the RunInference `PTransform`. This side input is used to
update the `model_uri` in the `model_handler` without needing to stop the
Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a
`file_pattern` matching `.h5` files. In this case, the `file_pattern` is
`'gs://BUCKET_NAME/*.h5'`.\n",
+ "\n"
+ ],
+ "metadata": {
+ "id": "eB0-ewd-BCKE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ " # The side input used to watch for the .h5 file and update the
model_uri of the TFModelHandlerTensor.\n",
+ " file_pattern = 'gs://BUCKET_NAME/*.h5'\n",
+ " side_input_pcoll = (\n",
+ " pipeline\n",
+ " | \"WatchFilePattern\" >>
WatchFilePattern(file_pattern=file_pattern,\n",
+ "
interval=side_input_fire_interval,\n",
+ "
stop_timestamp=end_timestamp))\n",
+ " inferences = (\n",
+ " image_data\n",
+ " | \"ApplyWindowing\" >>
beam.WindowInto(beam.window.FixedWindows(10))\n",
+ " | \"RunInference\" >>
RunInference(model_handler=model_handler,\n",
+ "
model_metadata_pcoll=side_input_pcoll))"
+ ],
+ "metadata": {
+ "id": "_AjvvexJ_hUq"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "4. Post-process the `PredictionResult` object.\n",
+ "\n",
+ " When the inference is complete, RunInference outputs a
`PredictionResult` object that contains the fields `example`, `inference`, and
`model_id`. The `model_id` field identifies the model used to run the
inference. The `PostProcessor` returns the predicted label and the model ID
used to run the inference on the predicted label."
+ ],
+ "metadata": {
+ "id": "lTA4wRWNDVis"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "post_processor = (\n",
+ " inferences\n",
+ " | \"PostProcessResults\" >> beam.ParDo(PostProcessor())\n",
+ " | \"LogResults\" >> beam.Map(logging.info))"
+ ],
+ "metadata": {
+ "id": "9TB76fo-_vZJ"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**How to watch for the automatic model update**\n",
+ "\n",
+ " After the pipeline starts processing data and when you see output
emitted from the RunInference `PTransform`, upload a `resnet152` model saved in
`.h5` format to a Google Cloud Storage bucket location that matches the
`file_pattern` you defined earlier. You can download a copy of the model by
clicking [this
link](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5).
RunInference uses `WatchFilePattern` as a side i [...]
+ ],
+ "metadata": {
+ "id": "wYp-mBHHjOjA"
+ }
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "## Run the pipeline"
+ ],
+ "metadata": {
+ "id": "_ty03jDnKdKR"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Run the pipeline.\n",
+ "result = pipeline.run().wait_until_finish()"
+ ],
+ "metadata": {
+ "id": "wd0VJLeLEWBU"
+ },
+ "execution_count": null,
+ "outputs": []
+ }
+ ]
+}
\ No newline at end of file